diff --git a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala index 99e6db6..3fe4ea9 100644 --- a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala +++ b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala @@ -18,9 +18,9 @@ package com.commercetools.queue.pubsub import cats.effect.{IO, Resource} import com.commercetools.queue.QueueClient -import com.commercetools.queue.gcp.pubsub.PubSubClient +import com.commercetools.queue.gcp.pubsub.{PubSubClient, PubSubConfig} import com.commercetools.queue.testkit.QueueClientSuite -import com.google.api.gax.core.{GoogleCredentialsProvider, NoCredentialsProvider} +import com.google.api.gax.core.{CredentialsProvider, GoogleCredentialsProvider, NoCredentialsProvider} import scala.concurrent.duration.{Duration, DurationInt} import scala.jdk.CollectionConverters._ @@ -39,9 +39,10 @@ class PubSubClientSuite extends QueueClientSuite { // stats require a long time to be propagated and be available override def munitIOTimeout: Duration = 15.minutes - private def config = + private def config: IO[(String, CredentialsProvider, Option[String], PubSubConfig)] = booleanOrDefault(isEmulatorEnvVar, default = isEmulatorDefault).ifM( - ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"))), + ifTrue = + IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"), PubSubConfig("test-suite"))), ifFalse = for { project <- string("GCP_PUBSUB_PROJECT") credentials = GoogleCredentialsProvider @@ -51,12 +52,12 @@ class PubSubClientSuite extends QueueClientSuite { "https://www.googleapis.com/auth/monitoring.read" // monitoring (for fetching stats) ).asJava) .build() - } yield (project, credentials, None) + } yield (project, credentials, None, PubSubConfig("test-suite")) ) override def client: Resource[IO, QueueClient[IO]] = - config.toResource.flatMap { case (project, credentials, endpoint) => - PubSubClient(project, credentials, endpoint = endpoint) + config.toResource.flatMap { case (project, credentials, endpoint, configs) => + PubSubClient(project, credentials, endpoint = endpoint, configs = configs) } } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala index f9347a2..61dbd74 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -31,7 +31,8 @@ private class PubSubAdministration[F[_]]( project: String, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, - endpoint: Option[String] + endpoint: Option[String], + configs: PubSubConfig )(implicit F: Async[F]) extends UnsealedQueueAdministration[F] { @@ -72,7 +73,7 @@ private class PubSubAdministration[F[_]]( Subscription .newBuilder() .setTopic(topicName.toString()) - .setName(SubscriptionName.of(project, s"fs2-queue-$name").toString()) + .setName(SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name").toString()) .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) .setMessageRetentionDuration(ttl) // An empty expiration policy (no TTL set) ensures the subscription is never deleted @@ -84,7 +85,7 @@ private class PubSubAdministration[F[_]]( .adaptError(makeQueueException(_, name)) override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = { - val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name") + val subscriptionName = SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name") val updateSubscriptionRequest = (messageTTL, lockTTL) match { case (Some(messageTTL), Some(lockTTL)) => val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() @@ -150,7 +151,7 @@ private class PubSubAdministration[F[_]]( override def configuration(name: String): F[QueueConfiguration] = subscriptionClient.use { client => wrapFuture[F, Subscription](F.delay { - val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name") + val subscriptionName = SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name") client .getSubscriptionCallable() .futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build()) @@ -178,7 +179,7 @@ private class PubSubAdministration[F[_]]( .futureCall( DeleteSubscriptionRequest .newBuilder() - .setSubscription(SubscriptionName.of(project, s"fs2-queue-$name").toString()) + .setSubscription(SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name").toString()) .build()) }) }.void diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index efbe6eb..1a42840 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -30,16 +30,17 @@ private class PubSubClient[F[_]: Async] private ( channelProvider: TransportChannelProvider, monitoringChannelProvider: TransportChannelProvider, credentials: CredentialsProvider, - endpoint: Option[String]) + endpoint: Option[String], + configs: PubSubConfig) extends UnsealedQueueClient[F] { override def administration: QueueAdministration[F] = - new PubSubAdministration[F](project, channelProvider, credentials, endpoint) + new PubSubAdministration[F](project, channelProvider, credentials, endpoint, configs) override def statistics(name: String): QueueStatistics[F] = new PubSubStatistics( name, - SubscriptionName.of(project, s"fs2-queue-$name"), + SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name"), monitoringChannelProvider, credentials, endpoint) @@ -50,7 +51,7 @@ private class PubSubClient[F[_]: Async] private ( override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = new PubSubSubscriber[F, T]( name, - SubscriptionName.of(project, s"fs2-queue-$name"), + SubscriptionName.of(project, s"${configs.subscriptionNamePrefix}-$name"), channelProvider, credentials, endpoint) @@ -86,7 +87,8 @@ object PubSubClient { credentials: CredentialsProvider, endpoint: Option[String] = None, mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel, - mkMonitoringTransportChannel: => GrpcTransportChannel = makeDefaultMonitoringTransportChannel + mkMonitoringTransportChannel: => GrpcTransportChannel = makeDefaultMonitoringTransportChannel, + configs: PubSubConfig = PubSubConfig.default )(implicit F: Async[F] ): Resource[F, QueueClient[F]] = ( @@ -98,7 +100,8 @@ object PubSubClient { channelProvider = FixedTransportChannelProvider.create(channel), monitoringChannelProvider = FixedTransportChannelProvider.create(monitoringChannel), credentials = credentials, - endpoint = endpoint + endpoint = endpoint, + configs = configs ) } @@ -107,7 +110,8 @@ object PubSubClient { credentials: CredentialsProvider, channelProvider: TransportChannelProvider, monitoringChannelProvider: TransportChannelProvider, - endpoint: Option[String] = None + endpoint: Option[String] = None, + configs: PubSubConfig = PubSubConfig.default )(implicit F: Async[F] ): QueueClient[F] = new PubSubClient[F]( @@ -115,6 +119,8 @@ object PubSubClient { channelProvider = channelProvider, monitoringChannelProvider = monitoringChannelProvider, credentials = credentials, - endpoint = endpoint) + endpoint = endpoint, + configs = configs + ) } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubConfig.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubConfig.scala new file mode 100644 index 0000000..3f0de2d --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubConfig.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.gcp.pubsub + +case class PubSubConfig(subscriptionNamePrefix: String) + +object PubSubConfig { + val default: PubSubConfig = PubSubConfig("fs2-queue") +}