From 5a007dfb349c9dba5d1b73e800d94a9e4384f7f9 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 2 Aug 2023 09:21:45 +0200 Subject: [PATCH 1/4] refactor: projects_tokens.project_path to projects_tokens.project_slug in token-repository --- .../triplesstore/InMemoryJenaForSpec.scala | 4 +- token-repository/README.md | 2 +- .../TokenRepositoryTypeSerializers.scala | 3 +- .../creation/PersistedSlugFinder.scala | 2 +- .../repository/creation/TokensPersister.scala | 7 +- .../repository/creation/package.scala | 8 +- .../fetching/PersistedTokensFinder.scala | 2 +- .../repository/init/DbInitializer.scala | 9 +-- .../repository/init/MigrationTools.scala | 1 - .../repository/init/ProjectPathAdder.scala | 4 +- .../repository/init/ProjectPathToSlug.scala | 58 +++++++++++++++ .../repository/init/TokensMigrator.scala | 74 ++++++++++++++----- .../InMemoryProjectsTokensDbSpec.scala | 4 +- .../creation/TokensPersisterSpec.scala | 2 +- .../repository/init/DbInitSpec.scala | 1 + .../repository/init/DbMigrations.scala | 2 +- ...rSpec.scala => ProjectPathAdderSpec.scala} | 4 +- .../init/ProjectPathToSlugSpec.scala | 65 ++++++++++++++++ .../repository/init/TokensMigratorSpec.scala | 43 ++++++----- 19 files changed, 226 insertions(+), 69 deletions(-) create mode 100644 token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlug.scala rename token-repository/src/test/scala/io/renku/tokenrepository/repository/init/{ProjectSlugAdderSpec.scala => ProjectPathAdderSpec.scala} (97%) create mode 100644 token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala diff --git a/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJenaForSpec.scala b/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJenaForSpec.scala index 8772b66822..27b7b2f910 100644 --- a/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJenaForSpec.scala +++ b/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJenaForSpec.scala @@ -35,8 +35,6 @@ trait InMemoryJenaForSpec extends ForAllTestContainer with InMemoryJena with Bef def clearDatasetsBefore: Boolean = true before { - if (clearDatasetsBefore) { - clearAllDatasets() - } + if (clearDatasetsBefore) clearAllDatasets() } } diff --git a/token-repository/README.md b/token-repository/README.md index 8177a7c26d..94e03b89fa 100644 --- a/token-repository/README.md +++ b/token-repository/README.md @@ -133,7 +133,7 @@ token-repository uses relational database as an internal storage. The DB has the | column | type | constraints | |--------------|-------------|-------------| | project_id | INT4 | PK NOT NULL | -| project_path | VARCHAR | NOT NULL | +| project_slug | VARCHAR | NOT NULL | | token | VARCHAR | NOT NULL | | created_at | TIMESTAMPTZ | NOT NULL | | expiry_date | DATE | NOT NULL | diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/TokenRepositoryTypeSerializers.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/TokenRepositoryTypeSerializers.scala index f5a7222803..22a5d30735 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/TokenRepositoryTypeSerializers.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/TokenRepositoryTypeSerializers.scala @@ -30,8 +30,7 @@ trait TokenRepositoryTypeSerializers { val projectIdEncoder: Encoder[projects.GitLabId] = int4.values.contramap(_.value) val projectSlugDecoder: Decoder[projects.Slug] = varchar.map(projects.Slug.apply) - val projectSlugEncoder: Encoder[projects.Slug] = - varchar.values.contramap((b: projects.Slug) => b.value) + val projectSlugEncoder: Encoder[projects.Slug] = varchar.values.contramap((b: projects.Slug) => b.value) private[repository] val encryptedAccessTokenDecoder: Decoder[EncryptedAccessToken] = varchar.emap(s => EncryptedAccessToken.from(s).leftMap(_.getMessage)) diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/PersistedSlugFinder.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/PersistedSlugFinder.scala index 8bcdcf7237..9307c077c3 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/PersistedSlugFinder.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/PersistedSlugFinder.scala @@ -50,7 +50,7 @@ private class PersistedSlugFinderImpl[F[_]: MonadCancelThrow: SessionResource: Q SqlStatement .named("find slug for token") .select[projects.GitLabId, projects.Slug]( - sql"""SELECT project_path + sql"""SELECT project_slug FROM projects_tokens WHERE project_id = $projectIdEncoder""" .query(projectSlugDecoder) diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/TokensPersister.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/TokensPersister.scala index 509a61ad33..9244eed95d 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/TokensPersister.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/TokensPersister.scala @@ -28,7 +28,6 @@ import cats.syntax.all._ import io.renku.db.{DbClient, SqlStatement} import io.renku.graph.model.projects.{GitLabId, Slug} import io.renku.tokenrepository.repository.metrics.QueriesExecutionTimes -import org.typelevel.twiddles.syntax._ import skunk._ import skunk.data.Completion import skunk.data.Completion.Delete @@ -64,7 +63,7 @@ private class TokensPersisterImpl[F[_]: MonadCancelThrow: SessionResource: Queri .named("associate token - delete") .command[GitLabId *: Slug *: EmptyTuple](sql""" DELETE FROM projects_tokens - WHERE project_id = $projectIdEncoder OR project_path = $projectSlugEncoder + WHERE project_id = $projectIdEncoder OR project_slug = $projectSlugEncoder """.command) .arguments(project.id, project.slug) .build @@ -81,7 +80,7 @@ private class TokensPersisterImpl[F[_]: MonadCancelThrow: SessionResource: Queri SqlStatement .named("associate token - insert") .command[GitLabId *: Slug *: EncryptedAccessToken *: CreatedAt *: ExpiryDate *: EmptyTuple](sql""" - INSERT INTO projects_tokens (project_id, project_path, token, created_at, expiry_date) + INSERT INTO projects_tokens (project_id, project_slug, token, created_at, expiry_date) VALUES ($projectIdEncoder, $projectSlugEncoder, $encryptedAccessTokenEncoder, $createdAtEncoder, $expiryDateEncoder) """.command) .arguments( @@ -101,7 +100,7 @@ private class TokensPersisterImpl[F[_]: MonadCancelThrow: SessionResource: Queri .named("associate token - update slug") .command[Slug *: GitLabId *: EmptyTuple](sql""" UPDATE projects_tokens - SET project_path = $projectSlugEncoder + SET project_slug = $projectSlugEncoder WHERE project_id = $projectIdEncoder """.command) .arguments(project.slug -> project.id) diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/package.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/package.scala index de9f9c7d25..d0f694cbb3 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/package.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/creation/package.scala @@ -27,12 +27,12 @@ package object creation { import io.renku.tokenrepository.repository.creation.TokenDates._ - private[creation] val createdAtDecoder: Decoder[CreatedAt] = + private[repository] val createdAtDecoder: Decoder[CreatedAt] = timestamptz.map(timestamp => CreatedAt(timestamp.toInstant)) - private[creation] val createdAtEncoder: Encoder[CreatedAt] = timestamptz.values.contramap((b: CreatedAt) => + private[repository] val createdAtEncoder: Encoder[CreatedAt] = timestamptz.values.contramap((b: CreatedAt) => OffsetDateTime.ofInstant(b.value, b.value.atOffset(ZoneOffset.UTC).toZonedDateTime.getZone) ) - private[creation] val expiryDateDecoder: Decoder[ExpiryDate] = date.map(date => ExpiryDate(date)) - private[creation] val expiryDateEncoder: Encoder[ExpiryDate] = date.values.contramap((b: ExpiryDate) => b.value) + private[repository] val expiryDateDecoder: Decoder[ExpiryDate] = date.map(date => ExpiryDate(date)) + private[repository] val expiryDateEncoder: Encoder[ExpiryDate] = date.values.contramap((b: ExpiryDate) => b.value) } diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/fetching/PersistedTokensFinder.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/fetching/PersistedTokensFinder.scala index bc9f1fb71b..7ad2998206 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/fetching/PersistedTokensFinder.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/fetching/PersistedTokensFinder.scala @@ -58,7 +58,7 @@ private class PersistedTokensFinderImpl[F[_]: MonadCancelThrow: SessionResource: override def findStoredToken(projectSlug: Slug): OptionT[F, EncryptedAccessToken] = run { SqlStatement(name = "find token - slug") .select[Slug, EncryptedAccessToken]( - sql"select token from projects_tokens where project_path = $projectSlugEncoder" + sql"select token from projects_tokens where project_slug = $projectSlugEncoder" .query(encryptedAccessTokenDecoder) ) .arguments(projectSlug) diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/DbInitializer.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/DbInitializer.scala index 9f8ae784e9..e3df39e8a1 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/DbInitializer.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/DbInitializer.scala @@ -37,10 +37,8 @@ class DbInitializerImpl[F[_]: Async: Logger](migrators: List[DBMigration[F]], ) extends DbInitializer[F] { override def run: F[Unit] = { - for { - _ <- migrators.map(_.run).sequence - _ <- Logger[F].info("Projects Tokens database initialization success") - } yield () + migrators.map(_.run).sequence >> + Logger[F].info("Projects Tokens database initialization success") } recoverWith logAndRetry private def logAndRetry: PartialFunction[Throwable, F[Unit]] = { case NonFatal(exception) => @@ -61,7 +59,8 @@ object DbInitializer { DuplicateProjectsRemover[F].pure[F], ExpiryAndCreatedDatesAdder[F].pure[F], TokensMigrator[F], - ExpiryAndCreatedDatesNotNull[F].pure[F] + ExpiryAndCreatedDatesNotNull[F].pure[F], + ProjectPathToSlug[F].pure[F] ).sequence def apply[F[_]: Async: GitLabClient: Logger: SessionResource: QueriesExecutionTimes]: F[DbInitializer[F]] = diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/MigrationTools.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/MigrationTools.scala index 4504c2d70a..2526d00cc6 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/MigrationTools.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/MigrationTools.scala @@ -22,7 +22,6 @@ import cats.MonadThrow import cats.data.Kleisli import cats.effect.MonadCancelThrow import cats.syntax.all._ -import org.typelevel.twiddles.syntax._ import skunk._ import skunk.codec.all._ import skunk.implicits._ diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathAdder.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathAdder.scala index 0ef1e4ac1b..47ed832c0b 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathAdder.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathAdder.scala @@ -58,7 +58,7 @@ private class ProjectPathAdder[F[_]: Spawn: Logger: SessionResource] } private lazy val logging: PartialFunction[Throwable, F[Unit]] = { case NonFatal(exception) => - Logger[F].error(exception)("'project_path' column adding failure") - exception.raiseError[F, Unit] + Logger[F].error(exception)("'project_path' column adding failure") >> + exception.raiseError[F, Unit] } } diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlug.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlug.scala new file mode 100644 index 0000000000..2656070834 --- /dev/null +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlug.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.tokenrepository.repository.init + +import MigrationTools._ +import cats.data.Kleisli +import cats.effect.{Async, Spawn} +import cats.syntax.all._ +import io.renku.tokenrepository.repository.ProjectsTokensDB.SessionResource +import io.renku.tokenrepository.repository.TokenRepositoryTypeSerializers +import org.typelevel.log4cats.Logger +import skunk._ +import skunk.implicits._ + +private object ProjectPathToSlug { + def apply[F[_]: Async: Logger: SessionResource]: DBMigration[F] = new ProjectPathToSlug[F] +} + +private class ProjectPathToSlug[F[_]: Spawn: Logger: SessionResource] + extends DBMigration[F] + with TokenRepositoryTypeSerializers { + + override def run: F[Unit] = SessionResource[F].useK { + checkColumnExists("projects_tokens", "project_slug") >>= { + case true => Kleisli.liftF(Logger[F].info("'project_slug' column existed")) + case false => renameColumn() + } + } + + private def renameColumn(): Kleisli[F, Session[F], Unit] = Kleisli { implicit session => + { + for { + _ <- execute(sql"ALTER TABLE projects_tokens RENAME COLUMN project_path TO project_slug".command) + _ <- execute(sql"ALTER INDEX IF EXISTS idx_project_path RENAME TO idx_project_slug".command) + _ <- Logger[F].info("column 'project_path' renamed to 'project_slug'") + } yield () + } onError log + } + + private lazy val log: PartialFunction[Throwable, F[Unit]] = + Logger[F].error(_)("renaming column 'project_path' to 'project_slug' failed") +} diff --git a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/TokensMigrator.scala b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/TokensMigrator.scala index b0e363b78d..3e182f224a 100644 --- a/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/TokensMigrator.scala +++ b/token-repository/src/main/scala/io/renku/tokenrepository/repository/init/TokensMigrator.scala @@ -21,8 +21,10 @@ package init import AccessTokenCrypto.EncryptedAccessToken import ProjectsTokensDB.SessionResource +import cats.data.Kleisli import cats.effect.{Async, Temporal} import cats.syntax.all._ +import creation.TokenDates.{CreatedAt, ExpiryDate} import creation._ import deletion.PersistedTokenRemover import io.renku.db.DbClient @@ -30,34 +32,32 @@ import io.renku.graph.model.projects import io.renku.http.client.{AccessToken, GitLabClient} import io.renku.tokenrepository.repository.metrics.QueriesExecutionTimes import org.typelevel.log4cats.Logger -import skunk.~ +import skunk._ +import skunk.data.Completion.Delete import scala.concurrent.duration._ private object TokensMigrator { def apply[F[_]: Async: GitLabClient: SessionResource: Logger: QueriesExecutionTimes]: F[DBMigration[F]] = for { - accessTokenCrypto <- AccessTokenCrypto[F]() - tokenValidator <- TokenValidator[F] - tokenRemover <- PersistedTokenRemover[F].pure[F] - tokensCreator <- NewTokensCreator[F]() - associationPersister <- TokensPersister[F].pure[F] - } yield new TokensMigrator[F](accessTokenCrypto, tokenValidator, tokenRemover, tokensCreator, associationPersister) + accessTokenCrypto <- AccessTokenCrypto[F]() + tokenValidator <- TokenValidator[F] + tokenRemover <- PersistedTokenRemover[F].pure[F] + tokensCreator <- NewTokensCreator[F]() + } yield new TokensMigrator[F](accessTokenCrypto, tokenValidator, tokenRemover, tokensCreator) } private class TokensMigrator[F[_]: Async: SessionResource: Logger: QueriesExecutionTimes]( - tokenCrypto: AccessTokenCrypto[F], - tokenValidator: TokenValidator[F], - tokenRemover: PersistedTokenRemover[F], - tokensCreator: NewTokensCreator[F], - associationPersister: TokensPersister[F], - retryInterval: Duration = 5 seconds + tokenCrypto: AccessTokenCrypto[F], + tokenValidator: TokenValidator[F], + tokenRemover: PersistedTokenRemover[F], + tokensCreator: NewTokensCreator[F], + retryInterval: Duration = 5 seconds ) extends DbClient[F](Some(QueriesExecutionTimes[F])) with DBMigration[F] with TokenRepositoryTypeSerializers { private val logPrefix = "token migration:" - import associationPersister._ import fs2.Stream import io.renku.db.SqlStatement import skunk.Void @@ -128,11 +128,49 @@ private class TokensMigrator[F[_]: Async: SessionResource: Logger: QueriesExecut private def persistWithRetry(project: Project, newTokenInfo: TokenCreationInfo, encryptedToken: EncryptedAccessToken - ): F[Unit] = - persistToken(TokenStoringInfo(project, encryptedToken, newTokenInfo.dates)) - .recoverWith(retry(persistWithRetry(project, newTokenInfo, encryptedToken))(project)) + ): F[Unit] = SessionResource[F].useK { + Kleisli { session => + (deleteToken(project) >> persistToken(project, encryptedToken, newTokenInfo.dates)) + .run(session) + .recoverWith(retry(persistWithRetry(project, newTokenInfo, encryptedToken))(project)) + } + } + + private def deleteToken(project: Project) = measureExecutionTime { + SqlStatement + .named(s"$logPrefix: delete token") + .command[projects.GitLabId *: projects.Slug *: EmptyTuple](sql""" + DELETE FROM projects_tokens + WHERE project_id = $projectIdEncoder OR project_path = $projectSlugEncoder + """.command) + .arguments(project.id, project.slug) + .build + .flatMapResult { + case Delete(_) => ().pure[F] + case completion => + new Exception( + show"$logPrefix $project deleting token for projectId = ${project.id}, projectSlug = ${project.slug} failed: $completion" + ).raiseError[F, Unit] + } + } + + private def persistToken(project: Project, encryptedToken: EncryptedAccessToken, newTokenDates: TokenDates) = + measureExecutionTime { + SqlStatement + .named[F](s"$logPrefix: persist new token") + .command[projects.GitLabId *: projects.Slug *: EncryptedAccessToken *: CreatedAt *: ExpiryDate *: EmptyTuple]( + sql"""INSERT INTO projects_tokens (project_id, project_path, token, created_at, expiry_date) + VALUES ($projectIdEncoder, $projectSlugEncoder, $encryptedAccessTokenEncoder, $createdAtEncoder, $expiryDateEncoder) + ON CONFLICT DO NOTHING""".command + ) + .arguments( + project.id *: project.slug *: encryptedToken *: newTokenDates.createdAt *: newTokenDates.expiryDate *: EmptyTuple + ) + .build + .void + } - private def retry[O](thunk: => F[O])(project: Project): PartialFunction[Throwable, F[O]] = { case ex: Exception => + private def retry[O](thunk: => F[O])(project: Project): PartialFunction[Throwable, F[O]] = { case ex => Logger[F].error(ex)(show"$logPrefix $project failure; retrying") >> Temporal[F].delayBy(thunk, retryInterval) } diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/InMemoryProjectsTokensDbSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/InMemoryProjectsTokensDbSpec.scala index 409f3c8b07..06f1c90920 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/InMemoryProjectsTokensDbSpec.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/InMemoryProjectsTokensDbSpec.scala @@ -58,7 +58,7 @@ trait InMemoryProjectsTokensDbSpec extends DbSpec with InMemoryProjectsTokensDb ): Unit = execute { Kleisli[IO, Session[IO], Unit] { session => val query: Command[Int *: String *: String *: OffsetDateTime *: LocalDate *: EmptyTuple] = - sql"""insert into projects_tokens (project_id, project_path, token, created_at, expiry_date) + sql"""insert into projects_tokens (project_id, project_slug, token, created_at, expiry_date) values ($int4, $varchar, $varchar, $timestamptz, $date) """.command session @@ -102,7 +102,7 @@ trait InMemoryProjectsTokensDbSpec extends DbSpec with InMemoryProjectsTokensDb protected def findToken(projectSlug: Slug): Option[String] = sessionResource .useK { - val query: Query[String, String] = sql"select token from projects_tokens where project_path = $varchar" + val query: Query[String, String] = sql"select token from projects_tokens where project_slug = $varchar" .query(varchar) Kleisli(_.prepare(query).flatMap(_.option(projectSlug.value))) } diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokensPersisterSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokensPersisterSpec.scala index fca97c91a0..9f2a89ae1e 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokensPersisterSpec.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokensPersisterSpec.scala @@ -146,7 +146,7 @@ class TokensPersisterSpec private def findTokenInfo(projectId: projects.GitLabId): Option[TokenStoringInfo] = sessionResource .useK { val query: Query[projects.GitLabId, TokenStoringInfo] = sql""" - SELECT project_id, project_path, token, created_at, expiry_date + SELECT project_id, project_slug, token, created_at, expiry_date FROM projects_tokens WHERE project_id = $projectIdEncoder""" .query( diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbInitSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbInitSpec.scala index 59b96ac9eb..d7eb3730b8 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbInitSpec.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbInitSpec.scala @@ -42,6 +42,7 @@ trait DbInitSpec extends InMemoryProjectsTokensDb with DbMigrations with BeforeA before { findAllTables() foreach dropTable migrationsToRun.map(_.run).sequence.unsafeRunSync() + logger.reset() } private def findAllTables(): List[String] = execute { diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbMigrations.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbMigrations.scala index 89081263b0..e81c06b7a0 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbMigrations.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/DbMigrations.scala @@ -31,7 +31,7 @@ trait DbMigrations { self: InMemoryProjectsTokensDb with IOSpec with MockFactory => implicit lazy val logger: TestLogger[IO] = TestLogger[IO]() - private implicit lazy val glClient: GitLabClient[IO] = mock[GitLabClient[IO]] + implicit lazy val glClient: GitLabClient[IO] = mock[GitLabClient[IO]] private implicit val metricsRegistry: TestMetricsRegistry[IO] = TestMetricsRegistry[IO] implicit val queriesExecTimes: QueriesExecutionTimes[IO] = QueriesExecutionTimes[IO]().unsafeRunSync() diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectSlugAdderSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathAdderSpec.scala similarity index 97% rename from token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectSlugAdderSpec.scala rename to token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathAdderSpec.scala index b23600e0ee..1d9576da26 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectSlugAdderSpec.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathAdderSpec.scala @@ -26,7 +26,7 @@ import org.scalatest.concurrent.{Eventually, IntegrationPatience} import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec -class ProjectSlugAdderSpec +class ProjectPathAdderSpec extends AnyWordSpec with IOSpec with DbInitSpec @@ -60,8 +60,6 @@ class ProjectSlugAdderSpec } private trait TestCase { - logger.reset() - val projectPathAdder = new ProjectPathAdder[IO] } } diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala new file mode 100644 index 0000000000..3f1436a477 --- /dev/null +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.tokenrepository.repository.init + +import cats.effect.IO +import io.renku.interpreters.TestLogger.Level.Info +import io.renku.testtools.IOSpec +import org.scalamock.scalatest.MockFactory +import org.scalatest.concurrent.{Eventually, IntegrationPatience} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should + +class ProjectPathToSlugSpec + extends AnyFlatSpec + with IOSpec + with DbInitSpec + with Eventually + with IntegrationPatience + with should.Matchers + with MockFactory { + + protected override lazy val migrationsToRun: List[DBMigration[IO]] = allMigrations.takeWhile { + case _: ProjectPathToSlug[IO] => false + case _ => true + } + + it should "rename the 'project_path' column to 'project_slug'" in { + + verifyColumnExists("projects_tokens", "project_path") shouldBe true + verifyColumnExists("projects_tokens", "project_slug") shouldBe false + + projectPathToSlug.run.unsafeRunSync() + + logger.loggedOnly(Info("column 'project_path' renamed to 'project_slug'")) + logger.reset() + + verifyColumnExists("projects_tokens", "project_path") shouldBe false + verifyColumnExists("projects_tokens", "project_slug") shouldBe true + + projectPathToSlug.run.unsafeRunSync() + + logger.loggedOnly(Info("'project_slug' column existed")) + + verifyIndexExists("projects_tokens", "idx_project_path") shouldBe false + verifyIndexExists("projects_tokens", "idx_project_slug") shouldBe true + } + + private lazy val projectPathToSlug = new ProjectPathToSlug[IO] +} diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/TokensMigratorSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/TokensMigratorSpec.scala index 077fce3290..71d27ef97f 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/TokensMigratorSpec.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/TokensMigratorSpec.scala @@ -228,30 +228,33 @@ class TokensMigratorSpec extends AnyWordSpec with IOSpec with DbInitSpec with sh val oldTokenEncrypted = encryptedAccessTokens.generateOne implicit val logger: TestLogger[IO] = TestLogger[IO]() - private val tokenCrypto = mock[AccessTokenCrypto[IO]] - private val tokenValidator = mock[TokenValidator[IO]] - private val tokenRemover = PersistedTokenRemover[IO] - private val tokensCreator = mock[NewTokensCreator[IO]] - private val tokensPersister = TokensPersister[IO] - val migration = new TokensMigrator[IO](tokenCrypto, - tokenValidator, - tokenRemover, - tokensCreator, - tokensPersister, - retryInterval = 500 millis - ) + private val tokenCrypto = mock[AccessTokenCrypto[IO]] + private val tokenValidator = mock[TokenValidator[IO]] + private val tokenRemover = PersistedTokenRemover[IO] + private val tokensCreator = mock[NewTokensCreator[IO]] + val migration = + new TokensMigrator[IO](tokenCrypto, tokenValidator, tokenRemover, tokensCreator, retryInterval = 500 millis) val logPrefix = "token migration:" def insert(project: Project, encryptedToken: EncryptedAccessToken) = - tokensPersister - .persistToken( - TokenStoringInfo(project, - encryptedToken, - TokenDates(timestampsNotInTheFuture.generateAs(CreatedAt), localDates.generateAs(ExpiryDate)) - ) - ) - .unsafeRunSync() + execute[Unit] { + Kleisli[IO, Session[IO], Unit] { session => + val command: Command[ + projects.GitLabId *: projects.Slug *: EncryptedAccessToken *: CreatedAt *: ExpiryDate *: EmptyTuple + ] = sql""" INSERT INTO projects_tokens (project_id, project_path, token, created_at, expiry_date) + VALUES ($projectIdEncoder, $projectSlugEncoder, $encryptedAccessTokenEncoder, $createdAtEncoder, $expiryDateEncoder)""".command + session + .prepare(command) + .flatMap( + _.execute( + project.id *: project.slug *: encryptedToken *: + timestampsNotInTheFuture.generateAs(CreatedAt) *: localDates.generateAs(ExpiryDate) *: EmptyTuple + ) + ) + .void + } + } def insertNonMigrated(project: Project, encryptedToken: EncryptedAccessToken) = execute[Unit] { Kleisli[IO, Session[IO], Unit] { session => From 86fa96f6a11e26548cc9e24f45fb0ed8b7ebbc02 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 2 Aug 2023 16:59:04 +0200 Subject: [PATCH 2/4] refactor: project_path columns renamed to project_slug in event-log --- event-log/README.md | 4 +- .../renku/eventlog/init/DbInitializer.scala | 3 +- .../renku/eventlog/init/MigratorTools.scala | 20 +++++ .../eventlog/init/ProjectPathToSlug.scala | 77 ++++++++++++++++ ...rSpec.scala => ProjectPathAdderSpec.scala} | 2 +- .../eventlog/init/ProjectPathToSlugSpec.scala | 88 +++++++++++++++++++ .../init/ProjectPathToSlugSpec.scala | 27 +++--- 7 files changed, 204 insertions(+), 17 deletions(-) create mode 100644 event-log/src/main/scala/io/renku/eventlog/init/ProjectPathToSlug.scala rename event-log/src/test/scala/io/renku/eventlog/init/{ProjectSlugAdderSpec.scala => ProjectPathAdderSpec.scala} (99%) create mode 100644 event-log/src/test/scala/io/renku/eventlog/init/ProjectPathToSlugSpec.scala diff --git a/event-log/README.md b/event-log/README.md index b206b3e88c..005e4372d0 100644 --- a/event-log/README.md +++ b/event-log/README.md @@ -898,7 +898,7 @@ Event-log uses relational database as an internal storage. The DB has the follow | project | |-------------------------------------------| | project_id INT4 PK NOT NULL | -| project_path VARCHAR NOT NULL | +| project_slug VARCHAR NOT NULL | | latest_event_date TIMESTAMPTZ NOT NULL | | event_payload | @@ -945,7 +945,7 @@ Event-log uses relational database as an internal storage. The DB has the follow | id SERIAL PK NOT NULL | | date TIMESTAMPTZ NOT NULL | | project_id INT4 NOT NULL | -| project_path VARCHAR NOT NULL | +| project_slug VARCHAR NOT NULL | | ts_migration | |---------------------------------------------| diff --git a/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala b/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala index 16cf13aa56..dd682d10dc 100644 --- a/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala +++ b/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala @@ -171,7 +171,8 @@ object DbInitializer { currentStatus = TransformationNonRecoverableFailure, destinationStatus = TriplesGenerated, discardingStatuses = TriplesStore :: Nil - ) + ), + ProjectPathToSlug[F] ) def apply[F[_]: Temporal: Logger: SessionResource](isMigrating: Ref[F, Boolean]): F[DbInitializer[F]] = diff --git a/event-log/src/main/scala/io/renku/eventlog/init/MigratorTools.scala b/event-log/src/main/scala/io/renku/eventlog/init/MigratorTools.scala index 37f8fae893..da1b656efd 100644 --- a/event-log/src/main/scala/io/renku/eventlog/init/MigratorTools.scala +++ b/event-log/src/main/scala/io/renku/eventlog/init/MigratorTools.scala @@ -51,6 +51,26 @@ private object MigratorTools { Kleisli(_.unique(query).recover { case _ => false }) } + def checkIndexExists[F[_]: MonadCancelThrow](index: String): Kleisli[F, Session[F], Boolean] = { + val query: Query[Void, Boolean] = sql""" + SELECT EXISTS ( + SELECT * + FROM pg_indexes + WHERE indexname = '#$index' + )""".query(bool) + Kleisli(_.unique(query).recover { case _ => false }) + } + + def checkIndexExists[F[_]: MonadCancelThrow](table: String, index: String): Kleisli[F, Session[F], Boolean] = { + val query: Query[Void, Boolean] = sql""" + SELECT EXISTS ( + SELECT * + FROM pg_indexes + WHERE tablename = '#$table' AND indexname = '#$index' + )""".query(bool) + Kleisli(_.unique(query).recover { case _ => false }) + } + def execute[F[_]: MonadCancelThrow](sql: Command[Void]): Kleisli[F, Session[F], Unit] = Kleisli(_.execute(sql).void) } diff --git a/event-log/src/main/scala/io/renku/eventlog/init/ProjectPathToSlug.scala b/event-log/src/main/scala/io/renku/eventlog/init/ProjectPathToSlug.scala new file mode 100644 index 0000000000..5d01aecaf2 --- /dev/null +++ b/event-log/src/main/scala/io/renku/eventlog/init/ProjectPathToSlug.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.eventlog.init + +import cats.data.Kleisli +import cats.effect.MonadCancelThrow +import cats.syntax.all._ +import io.renku.eventlog.EventLogDB.SessionResource +import org.typelevel.log4cats.Logger +import skunk._ +import skunk.implicits._ + +private trait ProjectPathToSlug[F[_]] extends DbMigrator[F] + +private object ProjectPathToSlug { + def apply[F[_]: MonadCancelThrow: Logger: SessionResource]: ProjectPathToSlug[F] = new ProjectPathToSlugImpl[F] +} + +private class ProjectPathToSlugImpl[F[_]: MonadCancelThrow: Logger: SessionResource] extends ProjectPathToSlug[F] { + import MigratorTools._ + + override def run: F[Unit] = SessionResource[F].useK { + checkAndRenameColumnIfNeeded(table = "project") >> + checkAndRenameColumnIfNeeded(table = "clean_up_events_queue") >> + removePathIndex() >> + checkAndCreateSlugIndexIfAbsent(table = "project") >> + checkAndCreateSlugIndexIfAbsent(table = "clean_up_events_queue") + } + + private def checkAndRenameColumnIfNeeded(table: String) = + checkColumnExists(table, "project_slug") >>= { + case true => Kleisli.liftF(Logger[F].info(s"column '$table.project_slug' existed")) + case false => renameColumn(table) + } + + private def renameColumn(table: String): Kleisli[F, Session[F], Unit] = + execute(sql"ALTER TABLE #$table RENAME COLUMN project_path TO project_slug".command) + .flatMapF(_ => Logger[F].info(s"column '$table.project_path' renamed to '$table.project_slug'")) + + private def removePathIndex(): Kleisli[F, Session[F], Unit] = { + val idx = "idx_project_path" + checkIndexExists(idx) >>= { + case false => Kleisli.liftF(Logger[F].info(s"index '$idx' not existed")) + case true => + execute(sql"DROP INDEX IF EXISTS #$idx".command) + .flatMapF(_ => Logger[F].info(s"index '$idx' removed")) + } + } + + private def checkAndCreateSlugIndexIfAbsent(table: String) = { + val idx = s"idx_${table}_project_slug" + checkIndexExists(table, idx) >>= { + case true => Kleisli.liftF(Logger[F].info(s"index '$idx' existed")) + case false => createIndex(table, idx) + } + } + + private def createIndex(table: String, idx: String): Kleisli[F, Session[F], Unit] = + execute(sql"CREATE INDEX IF NOT EXISTS #$idx ON #$table(project_slug)".command) + .flatMapF(_ => Logger[F].info(s"index '$idx' created")) +} diff --git a/event-log/src/test/scala/io/renku/eventlog/init/ProjectSlugAdderSpec.scala b/event-log/src/test/scala/io/renku/eventlog/init/ProjectPathAdderSpec.scala similarity index 99% rename from event-log/src/test/scala/io/renku/eventlog/init/ProjectSlugAdderSpec.scala rename to event-log/src/test/scala/io/renku/eventlog/init/ProjectPathAdderSpec.scala index 9d7c858f4f..9a66522812 100644 --- a/event-log/src/test/scala/io/renku/eventlog/init/ProjectSlugAdderSpec.scala +++ b/event-log/src/test/scala/io/renku/eventlog/init/ProjectPathAdderSpec.scala @@ -39,7 +39,7 @@ import skunk._ import skunk.codec.all._ import skunk.implicits._ -class ProjectSlugAdderSpec +class ProjectPathAdderSpec extends AnyWordSpec with IOSpec with DbInitSpec diff --git a/event-log/src/test/scala/io/renku/eventlog/init/ProjectPathToSlugSpec.scala b/event-log/src/test/scala/io/renku/eventlog/init/ProjectPathToSlugSpec.scala new file mode 100644 index 0000000000..ebda57c7f2 --- /dev/null +++ b/event-log/src/test/scala/io/renku/eventlog/init/ProjectPathToSlugSpec.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.eventlog.init + +import cats.effect.IO +import io.renku.interpreters.TestLogger +import io.renku.interpreters.TestLogger.Level.Info +import io.renku.testtools.IOSpec +import org.scalatest.concurrent.{Eventually, IntegrationPatience} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should + +class ProjectPathToSlugSpec + extends AnyFlatSpec + with should.Matchers + with IOSpec + with DbInitSpec + with Eventually + with IntegrationPatience { + + protected[init] override lazy val migrationsToRun: List[DbMigrator[IO]] = allMigrations.takeWhile { + case _: ProjectPathToSlugImpl[IO] => false + case _ => true + } + + it should "rename the 'project_path' column to 'project_slug' on the project and clean_up_events_queue tables, " + + "remove the old 'idx_project_path' index and " + + "create new 'idx_project_project_slug' and 'idx_clean_up_events_queue_project_slug' indices" in { + + verifyColumnExists("project", "project_path") shouldBe true + verifyColumnExists("project", "project_slug") shouldBe false + verifyColumnExists("clean_up_events_queue", "project_path") shouldBe true + verifyColumnExists("clean_up_events_queue", "project_slug") shouldBe false + verifyIndexExists("project", "idx_project_path") shouldBe false + verifyIndexExists("project", "idx_project_project_slug") shouldBe false + verifyIndexExists("clean_up_events_queue", "idx_project_path") shouldBe true + verifyIndexExists("clean_up_events_queue", "idx_clean_up_events_queue_project_slug") shouldBe false + + projectPathToSlug.run.unsafeRunSync() + + logger.loggedOnly( + Info("column 'project.project_path' renamed to 'project.project_slug'"), + Info("column 'clean_up_events_queue.project_path' renamed to 'clean_up_events_queue.project_slug'"), + Info("index 'idx_project_path' removed"), + Info("index 'idx_project_project_slug' created"), + Info("index 'idx_clean_up_events_queue_project_slug' created") + ) + logger.reset() + + verifyColumnExists("project", "project_path") shouldBe false + verifyColumnExists("project", "project_slug") shouldBe true + verifyColumnExists("clean_up_events_queue", "project_path") shouldBe false + verifyColumnExists("clean_up_events_queue", "project_slug") shouldBe true + verifyIndexExists("project", "idx_project_path") shouldBe false + verifyIndexExists("project", "idx_project_project_slug") shouldBe true + verifyIndexExists("clean_up_events_queue", "idx_project_path") shouldBe false + verifyIndexExists("clean_up_events_queue", "idx_clean_up_events_queue_project_slug") shouldBe true + + projectPathToSlug.run.unsafeRunSync() + + logger.loggedOnly( + Info("column 'project.project_slug' existed"), + Info("column 'clean_up_events_queue.project_slug' existed"), + Info("index 'idx_project_path' not existed"), + Info("index 'idx_project_project_slug' existed"), + Info("index 'idx_clean_up_events_queue_project_slug' existed") + ) + } + + private implicit lazy val logger: TestLogger[IO] = TestLogger[IO]() + private lazy val projectPathToSlug = new ProjectPathToSlugImpl[IO] +} diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala index 3f1436a477..b8274ee4dd 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/init/ProjectPathToSlugSpec.scala @@ -40,26 +40,27 @@ class ProjectPathToSlugSpec case _ => true } - it should "rename the 'project_path' column to 'project_slug'" in { + it should "rename the 'project_path' column to 'project_slug' " + + "as well as rename the 'idx_project_path' index" in { - verifyColumnExists("projects_tokens", "project_path") shouldBe true - verifyColumnExists("projects_tokens", "project_slug") shouldBe false + verifyColumnExists("projects_tokens", "project_path") shouldBe true + verifyColumnExists("projects_tokens", "project_slug") shouldBe false - projectPathToSlug.run.unsafeRunSync() + projectPathToSlug.run.unsafeRunSync() - logger.loggedOnly(Info("column 'project_path' renamed to 'project_slug'")) - logger.reset() + logger.loggedOnly(Info("column 'project_path' renamed to 'project_slug'")) + logger.reset() - verifyColumnExists("projects_tokens", "project_path") shouldBe false - verifyColumnExists("projects_tokens", "project_slug") shouldBe true + verifyColumnExists("projects_tokens", "project_path") shouldBe false + verifyColumnExists("projects_tokens", "project_slug") shouldBe true - projectPathToSlug.run.unsafeRunSync() + projectPathToSlug.run.unsafeRunSync() - logger.loggedOnly(Info("'project_slug' column existed")) + logger.loggedOnly(Info("'project_slug' column existed")) - verifyIndexExists("projects_tokens", "idx_project_path") shouldBe false - verifyIndexExists("projects_tokens", "idx_project_slug") shouldBe true - } + verifyIndexExists("projects_tokens", "idx_project_path") shouldBe false + verifyIndexExists("projects_tokens", "idx_project_slug") shouldBe true + } private lazy val projectPathToSlug = new ProjectPathToSlug[IO] } From 6557fcb64c3640ed55e17179a9c572b7f5690c92 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 2 Aug 2023 18:31:03 +0200 Subject: [PATCH 3/4] refactor: used the new project_slug column in all EL's queries --- .../eventpayload/EventPayloadFinder.scala | 5 +-- .../renku/eventlog/events/EventsFinder.scala | 14 +++--- .../cleanuprequest/CleanUpEventsQueue.scala | 2 +- .../cleanuprequest/ProjectIdFinder.scala | 2 +- .../commitsyncrequest/CommitSyncForcer.scala | 5 +-- .../consumers/creation/EventPersister.scala | 4 +- .../GlobalCommitSyncForcer.scala | 5 +-- .../alleventstonew/DbUpdater.scala | 2 +- .../DequeuedEventHandler.scala | 14 +++--- .../cleaning/ProjectCleaner.scala | 6 +-- .../DequeuedEventHandler.scala | 4 +- .../awaitinggeneration/EventFinder.scala | 4 +- .../producers/cleanup/EventFinder.scala | 6 +-- .../producers/commitsync/EventFinder.scala | 2 +- .../globalcommitsync/EventFinder.scala | 2 +- .../producers/membersync/EventFinder.scala | 2 +- .../minprojectinfo/EventFinder.scala | 2 +- .../producers/projectsync/EventFinder.scala | 6 +-- .../triplesgenerated/EventFinder.scala | 4 +- .../LongProcessingEventFinder.scala | 2 +- .../LostSubscriberEventFinder.scala | 2 +- .../zombieevents/LostZombieEventFinder.scala | 4 +- .../renku/eventlog/init/DbInitializer.scala | 4 +- .../renku/eventlog/metrics/StatsFinder.scala | 43 +++++++++---------- .../eventlog/CleanUpEventsProvisioning.scala | 4 +- .../eventlog/EventLogDataProvisioning.scala | 19 ++++---- .../creation/EventPersisterSpec.scala | 6 +-- .../init/EventStatusRenamerSpec.scala | 43 ++++++++++++------- 28 files changed, 114 insertions(+), 104 deletions(-) diff --git a/event-log/src/main/scala/io/renku/eventlog/eventpayload/EventPayloadFinder.scala b/event-log/src/main/scala/io/renku/eventlog/eventpayload/EventPayloadFinder.scala index a986d2a5cb..21913f4582 100644 --- a/event-log/src/main/scala/io/renku/eventlog/eventpayload/EventPayloadFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/eventpayload/EventPayloadFinder.scala @@ -56,11 +56,10 @@ object EventPayloadFinder { .build(_.option) def selectPayload: Query[EventId *: ProjectSlug *: EmptyTuple, PayloadData] = - sql""" - SELECT ep.payload + sql"""SELECT ep.payload FROM event_payload ep INNER JOIN project p USING (project_id) - WHERE ep.event_id = $eventIdEncoder AND p.project_path = $projectSlugEncoder + WHERE ep.event_id = $eventIdEncoder AND p.project_slug = $projectSlugEncoder """ .query(byteVectorDecoder) .map(PayloadData) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/EventsFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/EventsFinder.scala index 27e61d460a..c924e9418e 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/EventsFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/EventsFinder.scala @@ -65,7 +65,7 @@ private class EventsFinderImpl[F[_]: Async: NonEmptyParallel: SessionResource: Q } private val selectEventInfo: Fragment[Void] = sql""" - SELECT evt.event_id, prj.project_id, prj.project_path, evt.status, evt.event_date, evt.execution_date, evt.message, COUNT(times.status) + SELECT evt.event_id, prj.project_id, prj.project_slug, evt.status, evt.event_date, evt.execution_date, evt.message, COUNT(times.status) FROM event evt """ @@ -75,7 +75,7 @@ private class EventsFinderImpl[F[_]: Async: NonEmptyParallel: SessionResource: Q private val filterByProject: projects.Identifier => AppliedFragment = { case slug: projects.Slug => - val fragment: Fragment[projects.Slug] = sql"""AND prj.project_path = $projectSlugEncoder""" + val fragment: Fragment[projects.Slug] = sql"""AND prj.project_slug = $projectSlugEncoder""" fragment(slug) case id: projects.GitLabId => val fragment: Fragment[projects.GitLabId] = sql"""AND prj.project_id = $projectIdEncoder""" @@ -131,7 +131,7 @@ private class EventsFinderImpl[F[_]: Async: NonEmptyParallel: SessionResource: Q """ private val groupBy: Fragment[Void] = sql""" - GROUP BY evt.event_id, evt.status, evt.event_date, evt.execution_date, evt.message, prj.project_id, prj.project_path + GROUP BY evt.event_id, evt.status, evt.event_date, evt.execution_date, evt.message, prj.project_id, prj.project_slug """ private val orderBy: Sorting[Criteria.Sort.type] => Fragment[Void] = sorting => { @@ -230,7 +230,9 @@ private class EventsFinderImpl[F[_]: Async: NonEmptyParallel: SessionResource: Q sql"""SELECT times.status, times.processing_time FROM status_processing_time times WHERE times.event_id = $eventIdEncoder AND times.project_id = ( - SELECT project_id FROM project WHERE project_path = $projectSlugEncoder + SELECT project_id + FROM project + WHERE project_slug = $projectSlugEncoder ORDER BY project_id DESC LIMIT 1 ) @@ -245,7 +247,7 @@ private class EventsFinderImpl[F[_]: Async: NonEmptyParallel: SessionResource: Q val query: Fragment[projects.Slug] = sql""" SELECT COUNT(DISTINCT evt.event_id) FROM event evt - JOIN project prj ON evt.project_id = prj.project_id AND prj.project_path = $projectSlugEncoder + JOIN project prj ON evt.project_id = prj.project_id AND prj.project_slug = $projectSlugEncoder """ query(projectSlug) |+| whereEventDate(maybeDates) case Criteria(Criteria.Filters.ProjectEvents(projectId: projects.GitLabId, None, maybeDates), _, _) => @@ -259,7 +261,7 @@ private class EventsFinderImpl[F[_]: Async: NonEmptyParallel: SessionResource: Q val query: Fragment[projects.Slug *: EventStatus *: EmptyTuple] = sql""" SELECT COUNT(DISTINCT evt.event_id) FROM event evt - JOIN project prj ON evt.project_id = prj.project_id AND prj.project_path = $projectSlugEncoder + JOIN project prj ON evt.project_id = prj.project_id AND prj.project_slug = $projectSlugEncoder WHERE evt.status = $eventStatusEncoder """ query(projectSlug *: status *: EmptyTuple) |+| andEventDate(maybeDates) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/CleanUpEventsQueue.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/CleanUpEventsQueue.scala index 78b8758a09..e10a58e04a 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/CleanUpEventsQueue.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/CleanUpEventsQueue.scala @@ -54,7 +54,7 @@ private class CleanUpEventsQueueImpl[F[_]: Async: SessionResource: QueriesExecut measureExecutionTime { SqlStatement[F](name = "clean_up_events_queue - offer") .command[OffsetDateTime *: projects.GitLabId *: projects.Slug *: EmptyTuple]( - sql"""INSERT INTO clean_up_events_queue (date, project_id, project_path) + sql"""INSERT INTO clean_up_events_queue (date, project_id, project_slug) VALUES ($timestamptz, $projectIdEncoder, $projectSlugEncoder) ON CONFLICT DO NOTHING """.command diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/ProjectIdFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/ProjectIdFinder.scala index 94af03660d..b319473823 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/ProjectIdFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/cleanuprequest/ProjectIdFinder.scala @@ -48,7 +48,7 @@ private class ProjectIdFinderImpl[F[_]: MonadCancelThrow: SessionResource: Queri .select[projects.Slug, projects.GitLabId](sql""" SELECT project_id FROM project - WHERE project_path = $projectSlugEncoder + WHERE project_slug = $projectSlugEncoder """.query(projectIdDecoder)) .arguments(projectSlug) .build(_.option) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/commitsyncrequest/CommitSyncForcer.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/commitsyncrequest/CommitSyncForcer.scala index 9ca2b880b7..5618ec72ea 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/commitsyncrequest/CommitSyncForcer.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/commitsyncrequest/CommitSyncForcer.scala @@ -73,10 +73,9 @@ private class CommitSyncForcerImpl[F[_]: MonadCancelThrow: SessionResource: Quer SqlStatement .named(s"${categoryName.value.toLowerCase} - insert project") .command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple](sql""" - INSERT INTO project (project_id, project_path, latest_event_date) + INSERT INTO project (project_id, project_slug, latest_event_date) VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder) - ON CONFLICT (project_id) - DO NOTHING + ON CONFLICT (project_id) DO NOTHING """.command) .arguments(projectId *: projectSlug *: EventDate(Instant.EPOCH) *: EmptyTuple) .build diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/creation/EventPersister.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/creation/EventPersister.scala index 1cc6d698ff..9ec768849f 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/creation/EventPersister.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/creation/EventPersister.scala @@ -178,11 +178,11 @@ private class EventPersisterImpl[F[_]: MonadCancelThrow: SessionResource: Querie SqlStatement(name = "new - upsert project") .command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple]( sql""" - INSERT INTO project (project_id, project_path, latest_event_date) + INSERT INTO project (project_id, project_slug, latest_event_date) VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder) ON CONFLICT (project_id) DO - UPDATE SET latest_event_date = EXCLUDED.latest_event_date, project_path = EXCLUDED.project_path + UPDATE SET latest_event_date = EXCLUDED.latest_event_date, project_slug = EXCLUDED.project_slug WHERE EXCLUDED.latest_event_date > project.latest_event_date """.command ) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/globalcommitsyncrequest/GlobalCommitSyncForcer.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/globalcommitsyncrequest/GlobalCommitSyncForcer.scala index b94f121983..bed23b2a4e 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/globalcommitsyncrequest/GlobalCommitSyncForcer.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/globalcommitsyncrequest/GlobalCommitSyncForcer.scala @@ -76,10 +76,9 @@ private class GlobalCommitSyncForcerImpl[F[_]: MonadCancelThrow: SessionResource SqlStatement .named(s"${categoryName.value.toLowerCase} - insert project") .command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple](sql""" - INSERT INTO project (project_id, project_path, latest_event_date) + INSERT INTO project (project_id, project_slug, latest_event_date) VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder) - ON CONFLICT (project_id) - DO NOTHING + ON CONFLICT (project_id) DO NOTHING """.command) .arguments(projectId *: projectSlug *: EventDate(Instant.EPOCH) *: EmptyTuple) .build diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/alleventstonew/DbUpdater.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/alleventstonew/DbUpdater.scala index 394b8ea2eb..ff42635f25 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/alleventstonew/DbUpdater.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/alleventstonew/DbUpdater.scala @@ -60,7 +60,7 @@ private[statuschange] class DbUpdater[F[_]: Async: QueriesExecutionTimes]( SqlStatement .named("all_to_new - find projects") .select[Void, ProjectEventsToNew]( - sql"""SELECT proj.project_id, proj.project_path + sql"""SELECT proj.project_id, proj.project_slug FROM project proj ORDER BY proj.latest_event_date ASC""" .query(projectIdDecoder ~ projectSlugDecoder) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/DequeuedEventHandler.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/DequeuedEventHandler.scala index 56a86c150c..ce5b9e2aab 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/DequeuedEventHandler.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/DequeuedEventHandler.scala @@ -102,7 +102,7 @@ object DequeuedEventHandler { FROM event e JOIN project p ON e.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder WHERE #${`status IN`(EventStatus.all diff Set(Skipped, GeneratingTriples, AwaitingDeletion, Deleting))} FOR UPDATE ) old_evt @@ -125,7 +125,7 @@ object DequeuedEventHandler { FROM status_processing_time t JOIN project p ON t.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder )""".command) .arguments(project.id *: project.slug *: EmptyTuple) .build @@ -141,7 +141,7 @@ object DequeuedEventHandler { FROM event_payload ep JOIN project p ON ep.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder )""".command) .arguments(project.id *: project.slug *: EmptyTuple) .build @@ -158,7 +158,7 @@ object DequeuedEventHandler { FROM event e JOIN project p ON e.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder ) """.command) .arguments(status *: project.id *: project.slug *: EmptyTuple) @@ -179,7 +179,7 @@ object DequeuedEventHandler { FROM event e JOIN project p ON e.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder WHERE e.status = '#${GeneratingTriples.value}' )""".command) .arguments(project.id *: project.id *: project.slug *: EmptyTuple) @@ -197,7 +197,7 @@ object DequeuedEventHandler { FROM subscription_category_sync_time st JOIN project p ON st.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder ) """.command) .arguments(project.id *: project.slug *: EmptyTuple) @@ -212,7 +212,7 @@ object DequeuedEventHandler { FROM event e JOIN project p ON e.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder ORDER BY event_date DESC LIMIT 1""".query(eventDateDecoder)) .arguments(project.id *: project.slug *: EmptyTuple) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/cleaning/ProjectCleaner.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/cleaning/ProjectCleaner.scala index f206b5fc0e..c3099c77de 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/cleaning/ProjectCleaner.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/projecteventstonew/cleaning/ProjectCleaner.scala @@ -87,7 +87,7 @@ private[statuschange] class ProjectCleanerImpl[F[_]: Async: Logger: QueriesExecu SqlStatement(name = "project_to_new - clean_up_events_queue removal") .command[projects.GitLabId *: projects.Slug *: EmptyTuple](sql""" DELETE FROM clean_up_events_queue - WHERE project_id = $projectIdEncoder AND project_path = $projectSlugEncoder""".command) + WHERE project_id = $projectIdEncoder AND project_slug = $projectSlugEncoder""".command) .arguments(project.id *: project.slug *: EmptyTuple) .build .void @@ -102,7 +102,7 @@ private[statuschange] class ProjectCleanerImpl[F[_]: Async: Logger: QueriesExecu FROM subscription_category_sync_time st JOIN project p ON st.project_id = p.project_id AND p.project_id = $projectIdEncoder - AND p.project_path = $projectSlugEncoder + AND p.project_slug = $projectSlugEncoder )""".command) .arguments(project.id *: project.slug *: EmptyTuple) .build @@ -113,7 +113,7 @@ private[statuschange] class ProjectCleanerImpl[F[_]: Async: Logger: QueriesExecu SqlStatement(name = "project_to_new - remove project") .command[projects.GitLabId *: projects.Slug *: EmptyTuple](sql""" DELETE FROM project - WHERE project_id = $projectIdEncoder AND project_path = $projectSlugEncoder""".command) + WHERE project_id = $projectIdEncoder AND project_slug = $projectSlugEncoder""".command) .arguments(project.id *: project.slug *: EmptyTuple) .build .mapResult { diff --git a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/redoprojecttransformation/DequeuedEventHandler.scala b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/redoprojecttransformation/DequeuedEventHandler.scala index 5a99041183..64c35dd2a6 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/redoprojecttransformation/DequeuedEventHandler.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/consumers/statuschange/redoprojecttransformation/DequeuedEventHandler.scala @@ -66,7 +66,7 @@ private class DequeuedEventHandlerImpl[F[_]: Async: QueriesExecutionTimes]( FROM ( SELECT e.event_id, e.project_id FROM event e - JOIN project p ON e.project_id = p.project_id AND p.project_path = $projectSlugEncoder + JOIN project p ON e.project_id = p.project_id AND p.project_slug = $projectSlugEncoder WHERE e.status = '#${TriplesStore.value}' ORDER BY event_date DESC LIMIT 1 @@ -119,7 +119,7 @@ private class DequeuedEventHandlerImpl[F[_]: Async: QueriesExecutionTimes]( AND project_id = ( SELECT project_id FROM project - WHERE project_path = $projectSlugEncoder + WHERE project_slug = $projectSlugEncoder ) """.command ) diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/awaitinggeneration/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/awaitinggeneration/EventFinder.scala index abae6b40a8..e56f7a464c 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/awaitinggeneration/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/awaitinggeneration/EventFinder.scala @@ -83,7 +83,7 @@ private class EventFinderImpl[F[_]: Async: Parallel: SessionResource: QueriesExe .named(s"${SubscriptionCategory.categoryName.value.toLowerCase} - find projects") .select[ExecutionDate *: ExecutionDate *: Int *: EmptyTuple, ProjectInfo]( sql""" - SELECT p.project_id, p.project_path, p.latest_event_date, + SELECT p.project_id, p.project_slug, p.latest_event_date, (SELECT count(event_id) FROM event evt_int WHERE evt_int.project_id = p.project_id AND evt_int.status = '#${GeneratingTriples.value}') AS current_occupancy FROM ( SELECT DISTINCT project_id, MAX(event_date) AS max_event_date @@ -140,7 +140,7 @@ private class EventFinderImpl[F[_]: Async: Parallel: SessionResource: QueriesExe .select[projects.Slug *: projects.GitLabId *: ExecutionDate *: ExecutionDate *: EmptyTuple, AwaitingGenerationEvent ](sql""" - SELECT evt.event_id, evt.project_id, $projectSlugEncoder AS project_path, evt.event_body + SELECT evt.event_id, evt.project_id, $projectSlugEncoder AS project_slug, evt.event_body FROM ( SELECT project_id, max(event_date) AS max_event_date FROM event diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/cleanup/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/cleanup/EventFinder.scala index 43a49879f6..3b0cfba1c9 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/cleanup/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/cleanup/EventFinder.scala @@ -72,7 +72,7 @@ private class EventFinderImpl[F[_]: Async: Parallel: SessionResource: Logger: Qu SqlStatement .named(s"${categoryName.show.toLowerCase} - find event in queue") .select[Void, Project](sql""" - SELECT queue.project_id, queue.project_path + SELECT queue.project_id, queue.project_slug FROM clean_up_events_queue queue ORDER BY queue.date ASC LIMIT 1 @@ -85,7 +85,7 @@ private class EventFinderImpl[F[_]: Async: Parallel: SessionResource: Logger: Qu SqlStatement .named(s"${categoryName.show.toLowerCase} - find event") .select[ExecutionDate, Project](sql""" - SELECT evt.project_id, prj.project_path + SELECT evt.project_id, prj.project_slug FROM event evt JOIN project prj ON prj.project_id = evt.project_id WHERE evt.status = '#${AwaitingDeletion.value}' @@ -104,7 +104,7 @@ private class EventFinderImpl[F[_]: Async: Parallel: SessionResource: Logger: Qu .named(s"${categoryName.show.toLowerCase} - delete clean-up event") .command[projects.Slug](sql""" DELETE FROM clean_up_events_queue - WHERE project_path = $projectSlugEncoder + WHERE project_slug = $projectSlugEncoder """.command) .arguments(projectSlug) .build diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/commitsync/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/commitsync/EventFinder.scala index 6375668455..8a8505122d 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/commitsync/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/commitsync/EventFinder.scala @@ -77,7 +77,7 @@ private class EventFinderImpl[F[_]: MonadCancelThrow: SessionResource: QueriesEx LIMIT 1 ) event_status, proj.project_id, - proj.project_path, + proj.project_slug, sync_time.last_synced, proj.latest_event_date FROM project proj diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/globalcommitsync/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/globalcommitsync/EventFinder.scala index 0802e63a54..989fbd8a77 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/globalcommitsync/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/globalcommitsync/EventFinder.scala @@ -71,7 +71,7 @@ private class EventFinderImpl[F[_]: Async: SessionResource: QueriesExecutionTime .select[CategoryName *: LastSyncedDate *: EmptyTuple, (Project, Option[LastSyncedDate])]( sql"""SELECT proj.project_id, - proj.project_path, + proj.project_slug, sync_time.last_synced FROM project proj LEFT JOIN subscription_category_sync_time sync_time diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/membersync/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/membersync/EventFinder.scala index 4a804b8e39..25cbbd46d5 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/membersync/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/membersync/EventFinder.scala @@ -60,7 +60,7 @@ private class EventFinderImpl[F[_]: MonadCancelThrow: SessionResource: QueriesEx CategoryName *: EventDate *: LastSyncedDate *: EventDate *: LastSyncedDate *: EventDate *: LastSyncedDate *: EmptyTuple, (projects.GitLabId, Option[LastSyncedDate], MemberSyncEvent) ]( - sql"""SELECT proj.project_id, sync_time.last_synced, proj.project_path + sql"""SELECT proj.project_id, sync_time.last_synced, proj.project_slug FROM project proj LEFT JOIN subscription_category_sync_time sync_time ON sync_time.project_id = proj.project_id AND sync_time.category_name = $categoryNameEncoder diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/minprojectinfo/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/minprojectinfo/EventFinder.scala index 4e81a21aa0..ce85ce6c9b 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/minprojectinfo/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/minprojectinfo/EventFinder.scala @@ -54,7 +54,7 @@ private class EventFinderImpl[F[_]: MonadCancelThrow: SessionResource: QueriesEx SqlStatement .named(s"${categoryName.show.toLowerCase} - find event") .select[Void, MinProjectInfoEvent]( - sql"""SELECT p.project_id, p.project_path + sql"""SELECT p.project_id, p.project_slug FROM project p WHERE NOT EXISTS ( SELECT project_id diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/projectsync/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/projectsync/EventFinder.scala index a73ad2427c..346cf9a594 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/projectsync/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/projectsync/EventFinder.scala @@ -52,10 +52,10 @@ private class EventFinderImpl[F[_]: MonadCancelThrow: SessionResource: QueriesEx SqlStatement .named(s"${categoryName.value.toLowerCase} - find event") .select[LastSyncedDate, (projects.GitLabId, Option[LastSyncedDate], ProjectSyncEvent)]( - sql"""SELECT candidate.project_id, candidate.real_sync, candidate.project_path + sql"""SELECT candidate.project_id, candidate.real_sync, candidate.project_slug FROM ( SELECT proj.project_id, - proj.project_path, + proj.project_slug, sync_time.last_synced, sync_time.last_synced AS real_sync FROM project proj @@ -64,7 +64,7 @@ private class EventFinderImpl[F[_]: MonadCancelThrow: SessionResource: QueriesEx WHERE ($lastSyncedDateEncoder - sync_time.last_synced) > INTERVAL '1 day' UNION SELECT proj.project_id, - proj.project_path, + proj.project_slug, TIMESTAMP WITH TIME ZONE 'epoch' AS last_synced, NULL AS real_sync FROM project proj diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/triplesgenerated/EventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/triplesgenerated/EventFinder.scala index b92c59117d..379f46c8fc 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/triplesgenerated/EventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/triplesgenerated/EventFinder.scala @@ -81,7 +81,7 @@ private class EventFinderImpl[F[_]: Async: SessionResource: QueriesExecutionTime .named(s"${SubscriptionCategory.categoryName.value.toLowerCase} - find projects") .select[ExecutionDate *: ExecutionDate *: Int *: EmptyTuple, ProjectInfo]( sql""" - SELECT p.project_id, p.project_path, p.latest_event_date, + SELECT p.project_id, p.project_slug, p.latest_event_date, (SELECT count(event_id) FROM event evt_int WHERE evt_int.project_id = p.project_id AND evt_int.status = '#${TransformingTriples.value}') AS current_occupancy FROM ( SELECT DISTINCT evt.project_id, MAX(evt.event_date) AS max_event_date @@ -140,7 +140,7 @@ private class EventFinderImpl[F[_]: Async: SessionResource: QueriesExecutionTime TriplesGeneratedEvent ]( sql""" - SELECT evt.event_id, evt.project_id, $projectSlugEncoder AS project_path, evt_payload.payload + SELECT evt.event_id, evt.project_id, $projectSlugEncoder AS project_slug, evt_payload.payload FROM ( SELECT evt_int.project_id, max(event_date) AS max_event_date FROM event evt_int diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LongProcessingEventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LongProcessingEventFinder.scala index 187f4461c9..50bfc2243a 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LongProcessingEventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LongProcessingEventFinder.scala @@ -90,7 +90,7 @@ private class LongProcessingEventFinder[F[_]: Async: SessionResource: QueriesExe projects.GitLabId *: EventStatus *: String *: ExecutionDate *: EventProcessingTime *: ExecutionDate *: EventProcessingTime *: EmptyTuple, ZombieEvent ]( - sql"""SELECT evt.event_id, evt.project_id, proj.project_path, evt.status + sql"""SELECT evt.event_id, evt.project_id, proj.project_slug, evt.status FROM event evt JOIN project proj ON proj.project_id = evt.project_id LEFT JOIN event_delivery ed ON ed.project_id = evt.project_id AND ed.event_id = evt.event_id diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostSubscriberEventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostSubscriberEventFinder.scala index 3d00e2db25..2995813cd5 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostSubscriberEventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostSubscriberEventFinder.scala @@ -55,7 +55,7 @@ private class LostSubscriberEventFinder[F[_]: MonadCancelThrow: SessionResource: .named(s"${categoryName.value.toLowerCase} - lse - find events") .select[Void, ZombieEvent]( sql""" - SELECT DISTINCT evt.event_id, evt.project_id, proj.project_path, evt.status + SELECT DISTINCT evt.event_id, evt.project_id, proj.project_slug, evt.status FROM event_delivery delivery JOIN event evt ON evt.event_id = delivery.event_id AND evt.project_id = delivery.project_id diff --git a/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostZombieEventFinder.scala b/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostZombieEventFinder.scala index 990823b215..fc8c09963b 100644 --- a/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostZombieEventFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/events/producers/zombieevents/LostZombieEventFinder.scala @@ -55,11 +55,11 @@ private class LostZombieEventFinder[F[_]: MonadCancelThrow: SessionResource: Que SqlStatement .named(s"${categoryName.value.toLowerCase} - lze - find event") .select[ExecutionDate *: EventProcessingTime *: EmptyTuple, ZombieEvent]( - sql"""SELECT evt.event_id, evt.project_id, proj.project_path, evt.status + sql"""SELECT evt.event_id, evt.project_id, proj.project_slug, evt.status FROM event evt JOIN project proj ON evt.project_id = proj.project_id WHERE - evt.status IN (#${ProcessingStatus.all.map(s => show"'$s'").mkString(", ")}) + evt.status IN (#${ProcessingStatus.all.map(s => s"'${s.value}'").mkString(", ")}) AND evt.message = '#$zombieMessage' AND (($executionDateEncoder - evt.execution_date) > $eventProcessingTimeEncoder) LIMIT 1 diff --git a/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala b/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala index dd682d10dc..a61ccf2dbe 100644 --- a/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala +++ b/event-log/src/main/scala/io/renku/eventlog/init/DbInitializer.scala @@ -82,6 +82,7 @@ object DbInitializer { TSMigrationTableCreator[F], CleanUpEventsTableCreator[F], ProjectIdOnCleanUpTable[F], + ProjectPathToSlug[F], FailedEventsRestorer[F]( "%Error: The repository is dirty. Please use the \"git\" command to clean it.%", currentStatus = GenerationNonRecoverableFailure, @@ -171,8 +172,7 @@ object DbInitializer { currentStatus = TransformationNonRecoverableFailure, destinationStatus = TriplesGenerated, discardingStatuses = TriplesStore :: Nil - ), - ProjectPathToSlug[F] + ) ) def apply[F[_]: Temporal: Logger: SessionResource](isMigrating: Ref[F, Boolean]): F[DbInitializer[F]] = diff --git a/event-log/src/main/scala/io/renku/eventlog/metrics/StatsFinder.scala b/event-log/src/main/scala/io/renku/eventlog/metrics/StatsFinder.scala index 8f0b3b1513..a742b6d7d8 100644 --- a/event-log/src/main/scala/io/renku/eventlog/metrics/StatsFinder.scala +++ b/event-log/src/main/scala/io/renku/eventlog/metrics/StatsFinder.scala @@ -217,15 +217,14 @@ class StatsFinderImpl[F[_]: Async: SessionResource: QueriesExecutionTimes]( private def prepareQuery(statuses: NonEmptyList[EventStatus]) = SqlStatement(name = "projects events count") - .select[Void, Slug ~ Long](sql"""SELECT - project_path, - (SELECT count(event_id) FROM event evt_int WHERE evt_int.project_id = prj.project_id AND status IN (#${statuses.toSql})) AS count - FROM project prj - WHERE EXISTS ( - SELECT project_id - FROM event evt - WHERE evt.project_id = prj.project_id AND status IN (#${statuses.toSql}) - ) + .select[Void, Slug ~ Long](sql"""SELECT project_slug, + (SELECT count(event_id) FROM event evt_int WHERE evt_int.project_id = prj.project_id AND status IN (#${statuses.toSql})) AS count + FROM project prj + WHERE EXISTS ( + SELECT project_id + FROM event evt + WHERE evt.project_id = prj.project_id AND status IN (#${statuses.toSql}) + ) """.query(projectSlugDecoder ~ int8).map { case slug ~ count => (slug, count) }) .arguments(Void) .build(_.toList) @@ -233,20 +232,18 @@ class StatsFinderImpl[F[_]: Async: SessionResource: QueriesExecutionTimes]( private def prepareQuery(statuses: NonEmptyList[EventStatus], limit: Int Refined Positive) = SqlStatement(name = "projects events count limit") .select[Int, (Slug, Long)]( - sql""" - SELECT - project_path, - (select count(event_id) FROM event evt_int WHERE evt_int.project_id = prj.project_id AND status IN (#${statuses.toSql})) AS count - FROM (select project_id, project_path, latest_event_date - FROM project - ORDER BY latest_event_date desc) prj - WHERE EXISTS ( - SELECT project_id - FROM event evt - WHERE evt.project_id = prj.project_id AND status IN (#${statuses.toSql}) - ) - LIMIT $int4; - """ + sql"""SELECT project_slug, + (select count(event_id) FROM event evt_int WHERE evt_int.project_id = prj.project_id AND status IN (#${statuses.toSql})) AS count + FROM (select project_id, project_slug, latest_event_date + FROM project + ORDER BY latest_event_date desc) prj + WHERE EXISTS ( + SELECT project_id + FROM event evt + WHERE evt.project_id = prj.project_id AND status IN (#${statuses.toSql}) + ) + LIMIT $int4 + """ .query(projectSlugDecoder ~ int8) .map { case projectSlug ~ count => (projectSlug, count) } ) diff --git a/event-log/src/test/scala/io/renku/eventlog/CleanUpEventsProvisioning.scala b/event-log/src/test/scala/io/renku/eventlog/CleanUpEventsProvisioning.scala index 373016b3fc..0093d9b033 100644 --- a/event-log/src/test/scala/io/renku/eventlog/CleanUpEventsProvisioning.scala +++ b/event-log/src/test/scala/io/renku/eventlog/CleanUpEventsProvisioning.scala @@ -40,7 +40,7 @@ trait CleanUpEventsProvisioning { execute { Kleisli { session => val query: Command[OffsetDateTime *: projects.GitLabId *: projects.Slug *: EmptyTuple] = sql""" - INSERT INTO clean_up_events_queue (date, project_id, project_path) + INSERT INTO clean_up_events_queue (date, project_id, project_slug) VALUES ($timestamptz, $projectIdEncoder, $projectSlugEncoder)""".command session .prepare(query) @@ -52,7 +52,7 @@ trait CleanUpEventsProvisioning { protected def findCleanUpEvents: List[(projects.GitLabId, projects.Slug)] = execute { Kleisli { session => val query: Query[Void, projects.GitLabId ~ projects.Slug] = sql""" - SELECT project_id, project_path + SELECT project_id, project_slug FROM clean_up_events_queue ORDER BY date DESC""" .query(projectIdDecoder ~ projectSlugDecoder) diff --git a/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala b/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala index 7a02fa368c..ff266aa716 100644 --- a/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala +++ b/event-log/src/test/scala/io/renku/eventlog/EventLogDataProvisioning.scala @@ -147,23 +147,26 @@ trait EventLogDataProvisioning { } } - protected def upsertProject(compoundEventId: CompoundEventId, projectSlug: Slug, eventDate: EventDate): Unit = - upsertProject(compoundEventId.projectId, projectSlug, eventDate) + protected def upsertProject(compoundEventId: CompoundEventId, + projectSlug: projects.Slug, + eventDate: EventDate + ): Unit = upsertProject(compoundEventId.projectId, projectSlug, eventDate) protected def upsertProject(project: consumers.Project, eventDate: EventDate): Unit = upsertProject(project.id, project.slug, eventDate) - protected def upsertProject(projectId: projects.GitLabId, projectSlug: Slug, eventDate: EventDate): Unit = execute { - Kleisli { session => - val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] = - sql"""INSERT INTO project (project_id, project_path, latest_event_date) + protected def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit = + execute { + Kleisli { session => + val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] = + sql"""INSERT INTO project (project_id, project_slug, latest_event_date) VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder) ON CONFLICT (project_id) DO UPDATE SET latest_event_date = excluded.latest_event_date WHERE excluded.latest_event_date > project.latest_event_date """.command - session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void + session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void + } } - } protected def upsertEventPayload(compoundEventId: CompoundEventId, eventStatus: EventStatus, diff --git a/event-log/src/test/scala/io/renku/eventlog/events/consumers/creation/EventPersisterSpec.scala b/event-log/src/test/scala/io/renku/eventlog/events/consumers/creation/EventPersisterSpec.scala index 20b935b024..b82f695ab8 100644 --- a/event-log/src/test/scala/io/renku/eventlog/events/consumers/creation/EventPersisterSpec.scala +++ b/event-log/src/test/scala/io/renku/eventlog/events/consumers/creation/EventPersisterSpec.scala @@ -152,7 +152,7 @@ class EventPersisterSpec storedProjects shouldBe List((event1.project.id, event1.project.slug, event2.date)) } - "update latest_event_date and project_path for a project " + + "update latest_event_date and project_slug for a project " + "only if there's an event with more recent Event Date added" in new TestCase { // storing event 1 @@ -179,7 +179,7 @@ class EventPersisterSpec storedProjects shouldBe List((event1.project.id, event2.project.slug, event2.date)) } - "do not update latest_event_date and project_path for a project " + + "do not update latest_event_date and project_slug for a project " + "only if there's an event with less recent Event Date added" in new TestCase { // storing event 1 @@ -354,7 +354,7 @@ class EventPersisterSpec private def storedProjects: List[(projects.GitLabId, projects.Slug, EventDate)] = execute { Kleisli { session => val query: Query[Void, (projects.GitLabId, projects.Slug, EventDate)] = - sql"""SELECT project_id, project_path, latest_event_date + sql"""SELECT project_id, project_slug, latest_event_date FROM project""" .query(projectIdDecoder ~ projectSlugDecoder ~ eventDateDecoder) .map { case projectId ~ projectSlug ~ eventDate => (projectId, projectSlug, eventDate) } diff --git a/event-log/src/test/scala/io/renku/eventlog/init/EventStatusRenamerSpec.scala b/event-log/src/test/scala/io/renku/eventlog/init/EventStatusRenamerSpec.scala index 6d7069a761..409983bc3e 100644 --- a/event-log/src/test/scala/io/renku/eventlog/init/EventStatusRenamerSpec.scala +++ b/event-log/src/test/scala/io/renku/eventlog/init/EventStatusRenamerSpec.scala @@ -44,7 +44,6 @@ class EventStatusRenamerSpec with IOSpec with DbInitSpec with should.Matchers - with EventLogDataProvisioning with EventDataFetching { protected[init] override lazy val migrationsToRun: List[DbMigrator[IO]] = allMigrations.takeWhile { @@ -113,24 +112,24 @@ class EventStatusRenamerSpec } private def store(event: Event, withStatus: String): Unit = { - upsertProject(event.compoundEventId, event.project.slug, event.date) + upsertProject(event.compoundEventId.projectId, event.project.slug, event.date) execute[Unit] { Kleisli { session => val query: Command[ EventId *: projects.GitLabId *: String *: CreatedDate *: ExecutionDate *: EventDate *: String *: BatchDate *: EmptyTuple ] = - sql"""INSERT INTO - event (event_id, project_id, status, created_date, execution_date, event_date, event_body, batch_date) - values ( - $eventIdEncoder, - $projectIdEncoder, - $varchar, - $createdDateEncoder, - $executionDateEncoder, - $eventDateEncoder, - $text, - $batchDateEncoder) - """.command + sql"""INSERT INTO event (event_id, project_id, status, created_date, execution_date, event_date, event_body, batch_date) + VALUES ( + $eventIdEncoder, + $projectIdEncoder, + $varchar, + $createdDateEncoder, + $executionDateEncoder, + $eventDateEncoder, + $text, + $batchDateEncoder + ) + """.command session .prepare(query) .flatMap( @@ -151,8 +150,20 @@ class EventStatusRenamerSpec } } - private def toJsonBody(event: Event): String = - json"""{ + private def upsertProject(projectId: projects.GitLabId, projectSlug: projects.Slug, eventDate: EventDate): Unit = + execute[Unit] { + Kleisli { session => + val query: Command[projects.GitLabId *: projects.Slug *: EventDate *: EmptyTuple] = + sql"""INSERT INTO project (project_id, project_path, latest_event_date) + VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder) + ON CONFLICT (project_id) + DO UPDATE SET latest_event_date = excluded.latest_event_date WHERE excluded.latest_event_date > project.latest_event_date + """.command + session.prepare(query).flatMap(_.execute(projectId *: projectSlug *: eventDate *: EmptyTuple)).void + } + } + + private def toJsonBody(event: Event): String = json"""{ "project": { "id": ${event.project.id}, "slug": ${event.project.slug} From 3396e06333a2333f765c003d7e31542cc42f4a23 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 2 Aug 2023 19:18:27 +0200 Subject: [PATCH 4/4] fix: ZombieEventDetectionSpec expecting project.project_path --- .../renku/graph/acceptancetests/ZombieEventDetectionSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ZombieEventDetectionSpec.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ZombieEventDetectionSpec.scala index feda32fa47..c35c8a28d5 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ZombieEventDetectionSpec.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/ZombieEventDetectionSpec.scala @@ -100,7 +100,7 @@ class ZombieEventDetectionSpec private def insertProjectToDB(project: data.Project, eventDate: EventDate): Int = EventLog.execute { session => val query: Command[GitLabId *: Slug *: EventDate *: EmptyTuple] = - sql"""INSERT INTO project (project_id, project_path, latest_event_date) + sql"""INSERT INTO project (project_id, project_slug, latest_event_date) VALUES ($projectIdEncoder, $projectSlugEncoder, $eventDateEncoder) ON CONFLICT (project_id) DO UPDATE SET latest_event_date = excluded.latest_event_date WHERE excluded.latest_event_date > project.latest_event_date