From 6a80c63ea0b5d0ce1b178362070fac52d40f52c1 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Mon, 23 Sep 2024 18:03:07 +0200 Subject: [PATCH] Fix gcp pubsub monitoring, rework specs --- build.sbt | 4 +- gcp/pubsub/integration/README.md | 9 +++ .../queue/pubsub/PubSubClientSuite.scala | 16 ++++- .../queue/gcp/pubsub/PubSubClient.scala | 59 +++++++++++++++---- .../queue/gcp/pubsub/PubSubStatsFetcher.scala | 15 +++-- .../queue/testkit/QueueClientSuite.scala | 41 ++++++++++++- .../queue/testkit/QueuePublisherSuite.scala | 39 ++++++++---- .../queue/testkit/QueueStatisticsSuite.scala | 31 +++++----- .../queue/testkit/QueueSubscriberSuite.scala | 47 ++++++--------- 9 files changed, 180 insertions(+), 81 deletions(-) create mode 100644 gcp/pubsub/integration/README.md diff --git a/build.sbt b/build.sbt index fa1ef7b..2a7c3fb 100644 --- a/build.sbt +++ b/build.sbt @@ -189,7 +189,9 @@ lazy val gcpPubSub = crossProject(JVMPlatform) "com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged$default$5"), ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPublisher.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPuller.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubSubscriber.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubSubscriber.this"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.apply") ), libraryDependencies ++= List( "com.google.cloud" % "google-cloud-pubsub" % "1.132.2", diff --git a/gcp/pubsub/integration/README.md b/gcp/pubsub/integration/README.md new file mode 100644 index 0000000..9a69e20 --- /dev/null +++ b/gcp/pubsub/integration/README.md @@ -0,0 +1,9 @@ +# How to run tests + +Tests are using [Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials). +Make sure your service account is assigned with the right scopes, e.g.: `roles/pubsub.admin`, `roles/monitoring.viewer`. +Steps: +- `export GOOGLE_APPLICATION_CREDENTIALS="~/keyfile.json"` +- `export GCP_PUBSUB_USE_EMULATOR=false` +- `export GCP_PUBSUB_PROJECT=` +- `sbt "project gcpPubSubIt" test` diff --git a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala index 5d40f56..99e6db6 100644 --- a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala +++ b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala @@ -20,7 +20,10 @@ import cats.effect.{IO, Resource} import com.commercetools.queue.QueueClient import com.commercetools.queue.gcp.pubsub.PubSubClient import com.commercetools.queue.testkit.QueueClientSuite -import com.google.api.gax.core.NoCredentialsProvider +import com.google.api.gax.core.{GoogleCredentialsProvider, NoCredentialsProvider} + +import scala.concurrent.duration.{Duration, DurationInt} +import scala.jdk.CollectionConverters._ class PubSubClientSuite extends QueueClientSuite { @@ -33,12 +36,21 @@ class PubSubClientSuite extends QueueClientSuite { override val messagesStatsSupported: Boolean = // // not supported in the emulator !sys.env.get(isEmulatorEnvVar).map(_.toBoolean).getOrElse(isEmulatorDefault) + // stats require a long time to be propagated and be available + override def munitIOTimeout: Duration = 15.minutes + private def config = booleanOrDefault(isEmulatorEnvVar, default = isEmulatorDefault).ifM( ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"))), ifFalse = for { project <- string("GCP_PUBSUB_PROJECT") - credentials = NoCredentialsProvider.create() // TODO + credentials = GoogleCredentialsProvider + .newBuilder() + .setScopesToApply(List( + "https://www.googleapis.com/auth/pubsub", // only pubsub, full access + "https://www.googleapis.com/auth/monitoring.read" // monitoring (for fetching stats) + ).asJava) + .build() } yield (project, credentials, None) ) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index e268eee..efbe6eb 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -17,16 +17,18 @@ package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} +import cats.syntax.all._ import com.commercetools.queue._ import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.grpc.GrpcTransportChannel import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider} import com.google.pubsub.v1.{SubscriptionName, TopicName} -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder +import io.grpc.netty.shaded.io.grpc.netty.{GrpcSslContexts, NettyChannelBuilder} private class PubSubClient[F[_]: Async] private ( project: String, channelProvider: TransportChannelProvider, + monitoringChannelProvider: TransportChannelProvider, credentials: CredentialsProvider, endpoint: Option[String]) extends UnsealedQueueClient[F] { @@ -35,7 +37,12 @@ private class PubSubClient[F[_]: Async] private ( new PubSubAdministration[F](project, channelProvider, credentials, endpoint) override def statistics(name: String): QueueStatistics[F] = - new PubSubStatistics(name, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, endpoint) + new PubSubStatistics( + name, + SubscriptionName.of(project, s"fs2-queue-$name"), + monitoringChannelProvider, + credentials, + endpoint) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint) @@ -52,34 +59,62 @@ private class PubSubClient[F[_]: Async] private ( object PubSubClient { - private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel = + private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel = { + val builder = endpoint match { + case Some(value) => + NettyChannelBuilder + .forTarget(value) + .usePlaintext() + case None => + NettyChannelBuilder + .forTarget("pubsub.googleapis.com:443") + .sslContext(GrpcSslContexts.forClient().build()) + } + GrpcTransportChannel.create(builder.build) + } + + private def makeDefaultMonitoringTransportChannel: GrpcTransportChannel = GrpcTransportChannel.create( NettyChannelBuilder - .forTarget(endpoint.getOrElse("https://pubsub.googleapis.com")) - .usePlaintext() - .build() + .forTarget("monitoring.googleapis.com:443") + .sslContext(GrpcSslContexts.forClient().build()) + .build ) def apply[F[_]]( project: String, credentials: CredentialsProvider, endpoint: Option[String] = None, - mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel + mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel, + mkMonitoringTransportChannel: => GrpcTransportChannel = makeDefaultMonitoringTransportChannel )(implicit F: Async[F] ): Resource[F, QueueClient[F]] = - Resource - .fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))) - .map { channel => - new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint) + ( + Resource.fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))), + Resource.fromAutoCloseable(F.blocking(mkMonitoringTransportChannel))) + .mapN { (channel, monitoringChannel) => + new PubSubClient[F]( + project = project, + channelProvider = FixedTransportChannelProvider.create(channel), + monitoringChannelProvider = FixedTransportChannelProvider.create(monitoringChannel), + credentials = credentials, + endpoint = endpoint + ) } def unmanaged[F[_]]( project: String, credentials: CredentialsProvider, channelProvider: TransportChannelProvider, + monitoringChannelProvider: TransportChannelProvider, endpoint: Option[String] = None )(implicit F: Async[F] ): QueueClient[F] = - new PubSubClient[F](project, channelProvider, credentials, endpoint) + new PubSubClient[F]( + project = project, + channelProvider = channelProvider, + monitoringChannelProvider = monitoringChannelProvider, + credentials = credentials, + endpoint = endpoint) } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatsFetcher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatsFetcher.scala index 8eeae54..4604cff 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatsFetcher.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatsFetcher.scala @@ -59,12 +59,15 @@ private class PubSubStatsFetcher[F[_]]( }) } .flatMap { response => - val datapoints: List[Point] = response.getTimeSeries(0).getPointsList().asScala.toList - datapoints.sortBy(-_.getInterval().getEndTime().getSeconds()).headOption match { - case Some(value) => - F.pure(QueueStats(value.getValue().getInt64Value().toInt, None, None)) - case None => - F.raiseError(MalformedQueueConfigurationException(queueName, "messages", "")) + if (response.getTimeSeriesCount == 0) F.pure(QueueStats(0, None, None)) + else { + val datapoints: List[Point] = response.getTimeSeries(0).getPointsList().asScala.toList + datapoints.sortBy(-_.getInterval().getEndTime().getSeconds()).headOption match { + case Some(value) => + F.pure(QueueStats(value.getValue().getInt64Value().toInt, None, None)) + case None => + F.raiseError(MalformedQueueConfigurationException(queueName, "messages", "")) + } } } diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala index f3011d3..7b81176 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala @@ -3,6 +3,7 @@ package com.commercetools.queue.testkit import cats.effect.std.{Env, Random} import cats.effect.{IO, Resource, SyncIO} import com.commercetools.queue.QueueClient +import munit.Location import munit.catseffect.IOFixture import scala.concurrent.duration._ @@ -26,7 +27,7 @@ abstract class QueueClientSuite def booleanOrDefault(varName: String, default: Boolean): IO[Boolean] = optBoolean(varName).map(_.getOrElse(default)) - override def munitIOTimeout: Duration = 1.minute + override def munitIOTimeout: Duration = 10.minute /** Override these if the given provider is not supporting these features */ val queueUpdateSupported: Boolean = true @@ -54,7 +55,7 @@ abstract class QueueClientSuite final def randomMessages(n: Int): IO[List[(String, Map[String, String])]] = for { random <- Random.scalaUtilRandom[IO] - size <- random.nextIntBounded(n) + size <- random.nextIntBounded(n - 1).map(_ + 1) // > 0 } yield messages(size) final def messages(n: Int): List[(String, Map[String, String])] = @@ -62,4 +63,40 @@ abstract class QueueClientSuite .range(0, n) .map(i => (i.toString, Map(s"metadata-$i-key" -> s"$i-value", s"metadata-$i-another-key" -> "another-value"))) + def eventuallyIO[A, B]( + obtained: IO[A], + returns: B, + clue: => Any = "values are not the same", + retries: Int = 200, + delay: FiniteDuration = 5.second + )(implicit + loc: Location, + ev: B <:< A + ): IO[Unit] = for { + t <- IO.realTimeInstant + _ <- assertIO(obtained, returns, clue).handleErrorWith(err => + if (retries > 0) + IO.println(s" ${t.toString} - retrying assertion after $delay, remaining retries: $retries") >> + IO.sleep(delay) >> + eventuallyIO(obtained, returns, clue, retries - 1, delay) + else IO.raiseError(err)) + } yield () + + def eventuallyBoolean( + cond: IO[Boolean], + clue: => Any = "values are not the same", + retries: Int = 200, + delay: FiniteDuration = 5.second + )(implicit + loc: Location + ): IO[Unit] = for { + t <- IO.realTimeInstant + _ <- assertIOBoolean(cond, clue).handleErrorWith(err => + if (retries > 0) + IO.println(s" ${t.toString} - retrying assertion after $delay, remaining retries: $retries") >> + IO.sleep(delay) >> + eventuallyBoolean(cond, clue, retries - 1, delay) + else IO.raiseError(err)) + } yield () + } diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala index 68d95c4..8e3ca75 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala @@ -1,5 +1,7 @@ package com.commercetools.queue.testkit +import cats.effect.IO +import cats.effect.kernel.Ref import munit.CatsEffectSuite import fs2.Stream import cats.syntax.all._ @@ -13,35 +15,48 @@ import scala.concurrent.duration.DurationInt trait QueuePublisherSuite extends CatsEffectSuite { self: QueueClientSuite => withQueue.test("sink publishes all the messages") { queueName => - assume(messagesStatsSupported) val client = clientFixture() for { - msgs <- randomMessages(30) + msgs <- randomMessages(10) + count <- Ref.of[IO, Int](0) _ <- Stream .emits(msgs) .through(client.publish(queueName).sink(batchSize = 10)) .compile .drain - messagesInQueue <- client.statistics(queueName).fetcher.use(_.fetch).map(_.messages) - _ = assertEquals(messagesInQueue, msgs.size) + _ <- client + .subscribe(queueName) + .processWithAutoAck(10, 10.seconds)(_ => count.update(_ + 1)) + .take(msgs.size.toLong) + .compile + .drain + .timeout(30.seconds) + _ <- assertIO(count.get, msgs.size) } yield () } withQueue.test("sink publishes all the messages with a delay") { queueName => - assume(messagesStatsSupported && delayedMessagesStatsSupported) val client = clientFixture() for { - msgs <- randomMessages(30) + msgs <- randomMessages(10) _ <- Stream .emits(msgs) - .through(client.publish(queueName).sink(batchSize = 10, delay = 1.minute.some)) + .through(client.publish(queueName).sink(batchSize = 10, delay = 10.seconds.some)) .compile .drain - statsFetcher = client.statistics(queueName).fetcher - messagesInQueue <- statsFetcher.use(_.fetch).map(_.messages) - delayedMessages <- statsFetcher.use(_.fetch).map(_.delayed) - _ = assertEquals(delayedMessages, msgs.size.some, "delayed messages are not what we expect") - _ = assertEquals(messagesInQueue, 0, "the queue is not empty") + _ <- client + .subscribe(queueName) + .puller + .use(puller => + for { + _ <- eventuallyBoolean( + puller.pullBatch(1, 1.second).map(_.isEmpty), + "chunk is not empty, messages are not getting delayed") + _ <- IO.sleep(10.seconds) + _ <- eventuallyBoolean( + puller.pullBatch(1, 10.seconds).map(chunk => !chunk.isEmpty), + "got no messages after delay") + } yield ()) } yield () } diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala index 0c62044..a70d058 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala @@ -1,6 +1,5 @@ package com.commercetools.queue.testkit -import cats.effect.IO import cats.syntax.all._ import fs2.Stream import munit.CatsEffectSuite @@ -23,19 +22,12 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite => .through(client.publish(queueName).sink(batchSize = 10)) .compile .drain - statsFetcher = client.statistics(queueName).fetcher - _ <- statsFetcher - .use(_.fetch) - .map(stats => assertEquals(stats.messages, messages.size, "Queue should be full")) _ <- client - .subscribe(queueName) - .processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(_ => IO.unit) - .take(messages.size.toLong) - .compile - .drain - _ <- statsFetcher - .use(_.fetch) - .map(stats => assertEquals(stats.messages, 0, "Queue should be empty")) + .statistics(queueName) + .fetcher + .use { fetcher => + eventuallyIO(fetcher.fetch.map(_.messages), messages.size, "Queue should be full") + } } yield () } @@ -53,8 +45,11 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite => .use { case (puller, statsFetcher) => for { chunk <- puller.pullBatch(10, 10.seconds) - stats <- statsFetcher.fetch - } yield assertEquals(stats.inflight, chunk.size.some, "Inflight stats doesn't match pulled messages") + _ <- eventuallyIO( + statsFetcher.fetch.map(_.inflight), + chunk.size.some, + "Inflight stats doesn't match pulled messages") + } yield () } } yield () } @@ -73,8 +68,10 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite => .statistics(queueName) .fetcher .use { statsFetcher => - statsFetcher.fetch.map(stats => - assertEquals(stats.delayed, messages.size.some, "Delayed stats doesn't match pulled messages")) + eventuallyIO( + statsFetcher.fetch.map(_.delayed), + messages.size.some, + "Delayed stats doesn't match pulled messages") } } yield () } diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala index 749961d..0e9dab4 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala @@ -110,49 +110,38 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite => else true } }.void) - _ <- - if (messagesStatsSupported) - assertIO( - client.statistics(queueName).fetcher.use(_.fetch).map(_.messages), - 0, - "not all the messages got acked") - else IO.unit + _ <- client + .subscribe(queueName) + .puller + .use(puller => assertIO(puller.pullBatch(1, 1.second), Chunk.empty, "not all messages got consumed")) } yield () } withQueue.test("attemptProcessWithAutoAck acks/nacks accordingly") { queueName => + val client = clientFixture() for { - toBeAckedRef <- Ref[IO].of(Set.empty[String]) - toBeNackedRef <- Ref[IO].of(Set.empty[String]) - client = clientFixture() _ <- Stream .emits(messages(10)) .through(client.publish(queueName).sink(batchSize = 10)) .compile .drain - _ <- client + res <- client .subscribe(queueName) .attemptProcessWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => - if (msg.rawPayload.toInt % 2 == 0) toBeAckedRef.update(_ + msg.rawPayload) - else toBeNackedRef.update(_ + msg.rawPayload) >> IO.raiseError(new RuntimeException("failed"))) + if (msg.rawPayload.toInt % 2 == 0) IO.unit + else IO.raiseError(new RuntimeException("failed"))) .take(10L) .compile - .drain - toBeAcked <- toBeAckedRef.get - toBeNacked <- toBeNackedRef.get - _ = assertEquals(toBeAcked, Set("0", "2", "4", "6", "8")) - _ = assertEquals(toBeNacked, Set("1", "3", "5", "7", "9")) - _ <- - if (messagesStatsSupported) - assertIOBoolean( - client - .statistics(queueName) - .fetcher - .use(_.fetch) - .map(stats => stats.messages + stats.inflight.getOrElse(0) == 5), - "not all the expected messages got nacked" - ) - else IO.unit + .toList + _ = assert(res.count(_.isLeft) == 5) + _ = assert(res.count(_.isRight) == 5) + _ <- client + .subscribe(queueName) + .puller + .use(puller => + eventuallyBoolean( + puller.pullBatch(1, 1.second).map(chunk => !chunk.isEmpty), + "expecting to have nacked messages back in the queue")) } yield () }