Skip to content

Commit

Permalink
Adapt errors from Azure to specific QueueException
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Mar 22, 2024
1 parent bb98221 commit eac9dc3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions
import com.commercetools.queue.QueueAdministration
Expand All @@ -36,11 +37,16 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp
.setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(lockTTL.toMillis))))
.void
.adaptError(makeQueueException(_, name))

override def delete(name: String): F[Unit] =
F.blocking(client.deleteQueue(name)).void
F.blocking(client.deleteQueue(name))
.void
.adaptError(makeQueueException(_, name))

override def exists(name: String): F[Boolean] =
F.blocking(client.getQueueExists(name)).map(_.booleanValue)
F.blocking(client.getQueueExists(name))
.map(_.booleanValue)
.adaptError(makeQueueException(_, name))

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.commercetools.queue.azure.servicebus

import cats.effect.kernel.Async
import cats.syntax.all._
import cats.syntax.flatMap._
import cats.syntax.either._
import cats.syntax.monadError._
import cats.syntax.functor._
import cats.effect.Async
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller}
import fs2.Chunk
Expand All @@ -27,7 +30,8 @@ import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusPuller[F[_], Data](
receiver: ServiceBusReceiverClient
receiver: ServiceBusReceiverClient,
queueName: String
)(implicit
F: Async[F],
deserializer: Deserializer[Data])
Expand All @@ -44,5 +48,7 @@ class ServiceBusPuller[F[_], Data](
new ServiceBusMessageContext(data, sbMessage, receiver)
})
}
.widen[Chunk[MessageContext[F, Data]]]
.adaptError(makePullQueueException(_, queueName))

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import java.time.ZoneOffset
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit serializer: Serializer[Data], F: Async[F])
class ServiceBusPusher[F[_], Data](
sender: ServiceBusSenderClient,
queueName: String
)(implicit
serializer: Serializer[Data],
F: Async[F])
extends QueuePusher[F, Data] {

override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
Expand All @@ -34,7 +39,9 @@ class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit seri
F.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))
} *>
F.blocking(sender.sendMessage(sbMessage)).void
F.blocking(sender.sendMessage(sbMessage))
.void
.adaptError(makePushQueueException(_, queueName))
}

override def push(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = {
Expand All @@ -46,7 +53,9 @@ class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit seri
}
}
} *>
F.blocking(sender.sendMessages(sbMessages.asJava)).void
F.blocking(sender.sendMessages(sbMessages.asJava))
.void
.adaptError(makePushQueueException(_, queueName))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ class ServiceBusQueuePublisher[F[_], Data](
} { s =>
F.delay(s.close())
}
.map(new ServiceBusPusher(_))
.map(new ServiceBusPusher(_, queueName))

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ServiceBusQueueSubscriber[F[_], Data](
}
}
.map { receiver =>
new ServiceBusPuller(receiver)
new ServiceBusPuller(receiver, name)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.commercetools.queue.azure

import com.azure.core.exception.{ResourceExistsException, ResourceNotFoundException}
import com.commercetools.queue.{QueueAlreadyExistException, QueueDoesNotExistException, QueueException, UnknownQueueException}
import com.commercetools.queue.CannotPushException
import com.commercetools.queue.CannotPullException
import com.commercetools.queue.CannotSettleException
import com.commercetools.queue.Settlement

package object servicebus {

def makeQueueException(t: Throwable, queueName: String): QueueException =
t match {
case _: ResourceNotFoundException => QueueDoesNotExistException(queueName, t)
case _: ResourceExistsException => QueueAlreadyExistException(queueName, t)
case t: QueueException => t
case _ => UnknownQueueException(queueName, t)
}

def makePushQueueException(t: Throwable, queueName: String): QueueException =
new CannotPushException(queueName, makeQueueException(t, queueName))

def makePullQueueException(t: Throwable, queueName: String): QueueException =
new CannotPullException(queueName, makeQueueException(t, queueName))

def makeSettlementException(t: Throwable, queueName: String, msgId: String, action: Settlement): QueueException =
new CannotSettleException(msgId = msgId, action = action, inner = makeQueueException(t, queueName))

}

0 comments on commit eac9dc3

Please sign in to comment.