Skip to content

Commit

Permalink
feat: adding prefix for Google PubSub subscriptions instead of the ha…
Browse files Browse the repository at this point in the history
…rdcoded one
  • Loading branch information
KristianLentinoCT committed Oct 21, 2024
1 parent 000577c commit 8d82df0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package com.commercetools.queue.pubsub

import cats.effect.{IO, Resource}
import com.commercetools.queue.QueueClient
import com.commercetools.queue.gcp.pubsub.PubSubClient
import com.commercetools.queue.gcp.pubsub.{PubSubClient, PubSubConfig}
import com.commercetools.queue.testkit.QueueClientSuite
import com.google.api.gax.core.{GoogleCredentialsProvider, NoCredentialsProvider}
import com.google.api.gax.core.{CredentialsProvider, GoogleCredentialsProvider, NoCredentialsProvider}

import scala.concurrent.duration.{Duration, DurationInt}
import scala.jdk.CollectionConverters._
Expand All @@ -39,9 +39,10 @@ class PubSubClientSuite extends QueueClientSuite {
// stats require a long time to be propagated and be available
override def munitIOTimeout: Duration = 15.minutes

private def config =
private def config: IO[(String, CredentialsProvider, Option[String], PubSubConfig)] =
booleanOrDefault(isEmulatorEnvVar, default = isEmulatorDefault).ifM(
ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"))),
ifTrue =
IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"), PubSubConfig("test-suite"))),
ifFalse = for {
project <- string("GCP_PUBSUB_PROJECT")
credentials = GoogleCredentialsProvider
Expand All @@ -51,12 +52,12 @@ class PubSubClientSuite extends QueueClientSuite {
"https://www.googleapis.com/auth/monitoring.read" // monitoring (for fetching stats)
).asJava)
.build()
} yield (project, credentials, None)
} yield (project, credentials, None, PubSubConfig("test-suite"))
)

override def client: Resource[IO, QueueClient[IO]] =
config.toResource.flatMap { case (project, credentials, endpoint) =>
PubSubClient(project, credentials, endpoint = endpoint)
config.toResource.flatMap { case (project, credentials, endpoint, configs) =>
PubSubClient(project, credentials, endpoint = endpoint, configs = configs)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ private class PubSubAdministration[F[_]](
project: String,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
endpoint: Option[String]
endpoint: Option[String],
configs: PubSubConfig
)(implicit F: Async[F])
extends UnsealedQueueAdministration[F] {

Expand Down Expand Up @@ -72,7 +73,7 @@ private class PubSubAdministration[F[_]](
Subscription
.newBuilder()
.setTopic(topicName.toString())
.setName(SubscriptionName.of(project, s"fs2-queue-$name").toString())
.setName(SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name").toString())
.setAckDeadlineSeconds(lockTTL.toSeconds.toInt)
.setMessageRetentionDuration(ttl)
// An empty expiration policy (no TTL set) ensures the subscription is never deleted
Expand All @@ -84,7 +85,7 @@ private class PubSubAdministration[F[_]](
.adaptError(makeQueueException(_, name))

override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = {
val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name")
val subscriptionName = SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name")
val updateSubscriptionRequest = (messageTTL, lockTTL) match {
case (Some(messageTTL), Some(lockTTL)) =>
val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build()
Expand Down Expand Up @@ -150,7 +151,7 @@ private class PubSubAdministration[F[_]](
override def configuration(name: String): F[QueueConfiguration] =
subscriptionClient.use { client =>
wrapFuture[F, Subscription](F.delay {
val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name")
val subscriptionName = SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name")
client
.getSubscriptionCallable()
.futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build())
Expand Down Expand Up @@ -178,7 +179,7 @@ private class PubSubAdministration[F[_]](
.futureCall(
DeleteSubscriptionRequest
.newBuilder()
.setSubscription(SubscriptionName.of(project, s"fs2-queue-$name").toString())
.setSubscription(SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name").toString())
.build())
})
}.void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ private class PubSubClient[F[_]: Async] private (
channelProvider: TransportChannelProvider,
monitoringChannelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
endpoint: Option[String])
endpoint: Option[String],
configs: PubSubConfig)
extends UnsealedQueueClient[F] {

override def administration: QueueAdministration[F] =
new PubSubAdministration[F](project, channelProvider, credentials, endpoint)
new PubSubAdministration[F](project, channelProvider, credentials, endpoint, configs)

override def statistics(name: String): QueueStatistics[F] =
new PubSubStatistics(
name,
SubscriptionName.of(project, s"fs2-queue-$name"),
SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name"),
monitoringChannelProvider,
credentials,
endpoint)
Expand All @@ -50,7 +51,7 @@ private class PubSubClient[F[_]: Async] private (
override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new PubSubSubscriber[F, T](
name,
SubscriptionName.of(project, s"fs2-queue-$name"),
SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name"),
channelProvider,
credentials,
endpoint)
Expand Down Expand Up @@ -86,7 +87,8 @@ object PubSubClient {
credentials: CredentialsProvider,
endpoint: Option[String] = None,
mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel,
mkMonitoringTransportChannel: => GrpcTransportChannel = makeDefaultMonitoringTransportChannel
mkMonitoringTransportChannel: => GrpcTransportChannel = makeDefaultMonitoringTransportChannel,
configs: PubSubConfig = PubSubConfig.default
)(implicit F: Async[F]
): Resource[F, QueueClient[F]] =
(
Expand All @@ -98,7 +100,8 @@ object PubSubClient {
channelProvider = FixedTransportChannelProvider.create(channel),
monitoringChannelProvider = FixedTransportChannelProvider.create(monitoringChannel),
credentials = credentials,
endpoint = endpoint
endpoint = endpoint,
configs = configs
)
}

Expand All @@ -107,14 +110,17 @@ object PubSubClient {
credentials: CredentialsProvider,
channelProvider: TransportChannelProvider,
monitoringChannelProvider: TransportChannelProvider,
endpoint: Option[String] = None
endpoint: Option[String] = None,
configs: PubSubConfig = PubSubConfig.default
)(implicit F: Async[F]
): QueueClient[F] =
new PubSubClient[F](
project = project,
channelProvider = channelProvider,
monitoringChannelProvider = monitoringChannelProvider,
credentials = credentials,
endpoint = endpoint)
endpoint = endpoint,
configs = configs
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.gcp.pubsub

case class PubSubConfig(subscriptionNamePrefix: String)

object PubSubConfig {
val default: PubSubConfig = PubSubConfig("fs2-queue")
}

0 comments on commit 8d82df0

Please sign in to comment.