Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More love for GCP Pub/Sub #57

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ lazy val gcpPubSub = crossProject(JVMPlatform)
"com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged$default$5"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPublisher.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPuller.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubSubscriber.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubSubscriber.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.apply")
),
libraryDependencies ++= List(
"com.google.cloud" % "google-cloud-pubsub" % "1.132.2",
Expand Down
9 changes: 9 additions & 0 deletions gcp/pubsub/integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# How to run tests

Tests are using [Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials).
Make sure your service account is assigned with the right scopes, e.g.: `roles/pubsub.admin`, `roles/monitoring.viewer`.
Steps:
- `export GOOGLE_APPLICATION_CREDENTIALS="~/keyfile.json"`
- `export GCP_PUBSUB_USE_EMULATOR=false`
- `export GCP_PUBSUB_PROJECT=<your-project>`
- `sbt "project gcpPubSubIt" test`
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import cats.effect.{IO, Resource}
import com.commercetools.queue.QueueClient
import com.commercetools.queue.gcp.pubsub.PubSubClient
import com.commercetools.queue.testkit.QueueClientSuite
import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.core.{GoogleCredentialsProvider, NoCredentialsProvider}

import scala.concurrent.duration.{Duration, DurationInt}
import scala.jdk.CollectionConverters._

class PubSubClientSuite extends QueueClientSuite {

Expand All @@ -33,12 +36,21 @@ class PubSubClientSuite extends QueueClientSuite {
override val messagesStatsSupported: Boolean = // // not supported in the emulator
!sys.env.get(isEmulatorEnvVar).map(_.toBoolean).getOrElse(isEmulatorDefault)

// stats require a long time to be propagated and be available
override def munitIOTimeout: Duration = 15.minutes

private def config =
booleanOrDefault(isEmulatorEnvVar, default = isEmulatorDefault).ifM(
ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"))),
ifFalse = for {
project <- string("GCP_PUBSUB_PROJECT")
credentials = NoCredentialsProvider.create() // TODO
credentials = GoogleCredentialsProvider
.newBuilder()
.setScopesToApply(List(
"https://www.googleapis.com/auth/pubsub", // only pubsub, full access
"https://www.googleapis.com/auth/monitoring.read" // monitoring (for fetching stats)
).asJava)
.build()
} yield (project, credentials, None)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
package com.commercetools.queue.gcp.pubsub

import cats.effect.{Async, Resource}
import cats.syntax.all._
import com.commercetools.queue._
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider}
import com.google.pubsub.v1.{SubscriptionName, TopicName}
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.shaded.io.grpc.netty.{GrpcSslContexts, NettyChannelBuilder}

private class PubSubClient[F[_]: Async] private (
project: String,
channelProvider: TransportChannelProvider,
monitoringChannelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
endpoint: Option[String])
extends UnsealedQueueClient[F] {
Expand All @@ -35,7 +37,12 @@ private class PubSubClient[F[_]: Async] private (
new PubSubAdministration[F](project, channelProvider, credentials, endpoint)

override def statistics(name: String): QueueStatistics[F] =
new PubSubStatistics(name, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, endpoint)
new PubSubStatistics(
name,
SubscriptionName.of(project, s"fs2-queue-$name"),
monitoringChannelProvider,
credentials,
endpoint)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint)
Expand All @@ -52,34 +59,62 @@ private class PubSubClient[F[_]: Async] private (

object PubSubClient {

private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel =
private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel = {
val builder = endpoint match {
case Some(value) =>
NettyChannelBuilder
.forTarget(value)
.usePlaintext()
case None =>
NettyChannelBuilder
.forTarget("pubsub.googleapis.com:443")
.sslContext(GrpcSslContexts.forClient().build())
}
GrpcTransportChannel.create(builder.build)
}

private def makeDefaultMonitoringTransportChannel: GrpcTransportChannel =
GrpcTransportChannel.create(
NettyChannelBuilder
.forTarget(endpoint.getOrElse("https://pubsub.googleapis.com"))
.usePlaintext()
.build()
.forTarget("monitoring.googleapis.com:443")
.sslContext(GrpcSslContexts.forClient().build())
.build
)

def apply[F[_]](
project: String,
credentials: CredentialsProvider,
endpoint: Option[String] = None,
mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel
mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel,
mkMonitoringTransportChannel: => GrpcTransportChannel = makeDefaultMonitoringTransportChannel
)(implicit F: Async[F]
): Resource[F, QueueClient[F]] =
Resource
.fromAutoCloseable(F.blocking(mkTransportChannel(endpoint)))
.map { channel =>
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint)
(
Resource.fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))),
Resource.fromAutoCloseable(F.blocking(mkMonitoringTransportChannel)))
.mapN { (channel, monitoringChannel) =>
new PubSubClient[F](
project = project,
channelProvider = FixedTransportChannelProvider.create(channel),
monitoringChannelProvider = FixedTransportChannelProvider.create(monitoringChannel),
credentials = credentials,
endpoint = endpoint
)
}

def unmanaged[F[_]](
project: String,
credentials: CredentialsProvider,
channelProvider: TransportChannelProvider,
monitoringChannelProvider: TransportChannelProvider,
endpoint: Option[String] = None
)(implicit F: Async[F]
): QueueClient[F] =
new PubSubClient[F](project, channelProvider, credentials, endpoint)
new PubSubClient[F](
project = project,
channelProvider = channelProvider,
monitoringChannelProvider = monitoringChannelProvider,
credentials = credentials,
endpoint = endpoint)

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ private class PubSubStatsFetcher[F[_]](
})
}
.flatMap { response =>
val datapoints: List[Point] = response.getTimeSeries(0).getPointsList().asScala.toList
datapoints.sortBy(-_.getInterval().getEndTime().getSeconds()).headOption match {
case Some(value) =>
F.pure(QueueStats(value.getValue().getInt64Value().toInt, None, None))
case None =>
F.raiseError(MalformedQueueConfigurationException(queueName, "messages", "<missing>"))
if (response.getTimeSeriesCount == 0) F.pure(QueueStats(0, None, None))
else {
val datapoints: List[Point] = response.getTimeSeries(0).getPointsList().asScala.toList
datapoints.sortBy(-_.getInterval().getEndTime().getSeconds()).headOption match {
case Some(value) =>
F.pure(QueueStats(value.getValue().getInt64Value().toInt, None, None))
case None =>
F.raiseError(MalformedQueueConfigurationException(queueName, "messages", "<missing>"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.commercetools.queue.testkit
import cats.effect.std.{Env, Random}
import cats.effect.{IO, Resource, SyncIO}
import com.commercetools.queue.QueueClient
import munit.Location
import munit.catseffect.IOFixture

import scala.concurrent.duration._
Expand All @@ -26,7 +27,7 @@ abstract class QueueClientSuite
def booleanOrDefault(varName: String, default: Boolean): IO[Boolean] =
optBoolean(varName).map(_.getOrElse(default))

override def munitIOTimeout: Duration = 1.minute
override def munitIOTimeout: Duration = 10.minute

/** Override these if the given provider is not supporting these features */
val queueUpdateSupported: Boolean = true
Expand Down Expand Up @@ -54,12 +55,48 @@ abstract class QueueClientSuite

final def randomMessages(n: Int): IO[List[(String, Map[String, String])]] = for {
random <- Random.scalaUtilRandom[IO]
size <- random.nextIntBounded(n)
size <- random.nextIntBounded(n - 1).map(_ + 1) // > 0
} yield messages(size)

final def messages(n: Int): List[(String, Map[String, String])] =
List
.range(0, n)
.map(i => (i.toString, Map(s"metadata-$i-key" -> s"$i-value", s"metadata-$i-another-key" -> "another-value")))

def eventuallyIO[A, B](
obtained: IO[A],
returns: B,
clue: => Any = "values are not the same",
retries: Int = 200,
delay: FiniteDuration = 5.second
)(implicit
loc: Location,
ev: B <:< A
): IO[Unit] = for {
t <- IO.realTimeInstant
_ <- assertIO(obtained, returns, clue).handleErrorWith(err =>
if (retries > 0)
IO.println(s" ${t.toString} - retrying assertion after $delay, remaining retries: $retries") >>
IO.sleep(delay) >>
eventuallyIO(obtained, returns, clue, retries - 1, delay)
else IO.raiseError(err))
} yield ()

def eventuallyBoolean(
cond: IO[Boolean],
clue: => Any = "values are not the same",
retries: Int = 200,
delay: FiniteDuration = 5.second
)(implicit
loc: Location
): IO[Unit] = for {
t <- IO.realTimeInstant
_ <- assertIOBoolean(cond, clue).handleErrorWith(err =>
if (retries > 0)
IO.println(s" ${t.toString} - retrying assertion after $delay, remaining retries: $retries") >>
IO.sleep(delay) >>
eventuallyBoolean(cond, clue, retries - 1, delay)
else IO.raiseError(err))
} yield ()

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.commercetools.queue.testkit

import cats.effect.IO
import cats.effect.kernel.Ref
import munit.CatsEffectSuite
import fs2.Stream
import cats.syntax.all._
Expand All @@ -13,35 +15,48 @@ import scala.concurrent.duration.DurationInt
trait QueuePublisherSuite extends CatsEffectSuite { self: QueueClientSuite =>

withQueue.test("sink publishes all the messages") { queueName =>
assume(messagesStatsSupported)
val client = clientFixture()
for {
msgs <- randomMessages(30)
msgs <- randomMessages(10)
count <- Ref.of[IO, Int](0)
_ <- Stream
.emits(msgs)
.through(client.publish(queueName).sink(batchSize = 10))
.compile
.drain
messagesInQueue <- client.statistics(queueName).fetcher.use(_.fetch).map(_.messages)
_ = assertEquals(messagesInQueue, msgs.size)
_ <- client
.subscribe(queueName)
.processWithAutoAck(10, 10.seconds)(_ => count.update(_ + 1))
.take(msgs.size.toLong)
.compile
.drain
.timeout(30.seconds)
_ <- assertIO(count.get, msgs.size)
} yield ()
}

withQueue.test("sink publishes all the messages with a delay") { queueName =>
assume(messagesStatsSupported && delayedMessagesStatsSupported)
val client = clientFixture()
for {
msgs <- randomMessages(30)
msgs <- randomMessages(10)
_ <- Stream
.emits(msgs)
.through(client.publish(queueName).sink(batchSize = 10, delay = 1.minute.some))
.through(client.publish(queueName).sink(batchSize = 10, delay = 10.seconds.some))
.compile
.drain
statsFetcher = client.statistics(queueName).fetcher
messagesInQueue <- statsFetcher.use(_.fetch).map(_.messages)
delayedMessages <- statsFetcher.use(_.fetch).map(_.delayed)
_ = assertEquals(delayedMessages, msgs.size.some, "delayed messages are not what we expect")
_ = assertEquals(messagesInQueue, 0, "the queue is not empty")
_ <- client
.subscribe(queueName)
.puller
.use(puller =>
for {
_ <- eventuallyBoolean(
puller.pullBatch(1, 1.second).map(_.isEmpty),
"chunk is not empty, messages are not getting delayed")
_ <- IO.sleep(10.seconds)
_ <- eventuallyBoolean(
puller.pullBatch(1, 10.seconds).map(chunk => !chunk.isEmpty),
"got no messages after delay")
} yield ())
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.commercetools.queue.testkit

import cats.effect.IO
import cats.syntax.all._
import fs2.Stream
import munit.CatsEffectSuite
Expand All @@ -23,19 +22,12 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite =>
.through(client.publish(queueName).sink(batchSize = 10))
.compile
.drain
statsFetcher = client.statistics(queueName).fetcher
_ <- statsFetcher
.use(_.fetch)
.map(stats => assertEquals(stats.messages, messages.size, "Queue should be full"))
_ <- client
.subscribe(queueName)
.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(_ => IO.unit)
.take(messages.size.toLong)
.compile
.drain
_ <- statsFetcher
.use(_.fetch)
.map(stats => assertEquals(stats.messages, 0, "Queue should be empty"))
.statistics(queueName)
.fetcher
.use { fetcher =>
eventuallyIO(fetcher.fetch.map(_.messages), messages.size, "Queue should be full")
}
} yield ()
}

Expand All @@ -53,8 +45,11 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite =>
.use { case (puller, statsFetcher) =>
for {
chunk <- puller.pullBatch(10, 10.seconds)
stats <- statsFetcher.fetch
} yield assertEquals(stats.inflight, chunk.size.some, "Inflight stats doesn't match pulled messages")
_ <- eventuallyIO(
statsFetcher.fetch.map(_.inflight),
chunk.size.some,
"Inflight stats doesn't match pulled messages")
} yield ()
}
} yield ()
}
Expand All @@ -73,8 +68,10 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite =>
.statistics(queueName)
.fetcher
.use { statsFetcher =>
statsFetcher.fetch.map(stats =>
assertEquals(stats.delayed, messages.size.some, "Delayed stats doesn't match pulled messages"))
eventuallyIO(
statsFetcher.fetch.map(_.delayed),
messages.size.some,
"Delayed stats doesn't match pulled messages")
}
} yield ()
}
Expand Down
Loading
Loading