Skip to content

Commit

Permalink
Adapt errors from AWS to specific QueueException
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Mar 22, 2024
1 parent f58aff2 commit bb98221
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,28 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S
.build())
}
}.void
.adaptError(makeQueueException(_, name))

override def delete(name: String): F[Unit] =
getQueueUrl(name).flatMap { queueUrl =>
F.fromCompletableFuture {
F.delay {
client.deleteQueue(
DeleteQueueRequest
.builder()
.queueUrl(queueUrl)
.build())
getQueueUrl(name)
.flatMap { queueUrl =>
F.fromCompletableFuture {
F.delay {
client.deleteQueue(
DeleteQueueRequest
.builder()
.queueUrl(queueUrl)
.build())
}
}
}
}.void
.void
.adaptError(makeQueueException(_, name))

override def exists(name: String): F[Boolean] =
getQueueUrl(name).as(true).recover { case _: QueueDoesNotExistException => false }
getQueueUrl(name)
.as(true)
.recover { case _: QueueDoesNotExistException => false }
.adaptError(makeQueueException(_, name))

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
Expand All @@ -35,15 +36,16 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext
client.getQueueUrl(GetQueueUrlRequest.builder().queueName(name).build())
}
}.map(_.queueUrl)
.adaptError(makeQueueException(_, name))

override def administration: QueueAdministration[F] =
new SQSAdministration(client, getQueueUrl(_))

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new SQSPublisher(client, getQueueUrl(name))
new SQSPublisher(client, name, getQueueUrl(name))

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new SQSSubscriber[F, T](getQueueUrl(name), client)
new SQSSubscriber[F, T](client, name, getQueueUrl(name))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.functor._
import com.commercetools.queue.MessageContext
import cats.syntax.monadError._
import com.commercetools.queue.{MessageContext, Settlement}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityRequest, DeleteMessageRequest}

Expand All @@ -31,6 +32,7 @@ class SQSMessageContext[F[_], T](
receiptHandle: String,
val messageId: String,
lockTTL: Int,
queueName: String,
queueUrl: String,
client: SqsAsyncClient
)(implicit F: Async[F])
Expand All @@ -42,6 +44,7 @@ class SQSMessageContext[F[_], T](
client.deleteMessage(DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle).build())
}
}.void
.adaptError(makeSettlementException(_, queueName, messageId, Settlement.Ack))

override def nack(): F[Unit] =
F.fromCompletableFuture {
Expand All @@ -55,6 +58,7 @@ class SQSMessageContext[F[_], T](
.build())
}
}.void
.adaptError(makeSettlementException(_, queueName, messageId, Settlement.Nack))

override def extendLock(): F[Unit] =
F.fromCompletableFuture {
Expand All @@ -68,5 +72,6 @@ class SQSMessageContext[F[_], T](
.build())
}
}.void
.adaptError(makeSettlementException(_, queueName, messageId, Settlement.ExtendLock))

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient

class SQSPublisher[F[_], T](
client: SqsAsyncClient,
queueName: String,
getQueueUrl: F[String]
)(implicit
F: Async[F],
serializer: Serializer[T])
extends QueuePublisher[F, T] {

override def pusher: Resource[F, QueuePusher[F, T]] =
Resource.eval(getQueueUrl).map(new SQSPusher(client, _))
Resource.eval(getQueueUrl).map(new SQSPusher(client, queueName, _))

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.all._
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
Expand All @@ -29,6 +32,7 @@ import scala.jdk.CollectionConverters._

class SQSPuller[F[_], T](
client: SqsAsyncClient,
queueName: String,
queueUrl: String,
lockTTL: Int
)(implicit
Expand All @@ -50,19 +54,24 @@ class SQSPuller[F[_], T](
.build())
}
}.flatMap { response =>
Chunk.iterator(response.messages().iterator().asScala).traverse { message =>
deserializer.deserialize(message.body()).liftTo[F].map { payload =>
new SQSMessageContext(
payload,
Instant.ofEpochMilli(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP).toLong),
message.attributesAsStrings().asScala.toMap,
message.receiptHandle(),
message.messageId(),
lockTTL,
queueUrl,
client
)
Chunk
.iterator(response.messages().iterator().asScala)
.traverse { message =>
deserializer.deserialize(message.body()).liftTo[F].map { payload =>
new SQSMessageContext(
payload = payload,
enqueuedAt =
Instant.ofEpochMilli(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP).toLong),
metadata = message.attributesAsStrings().asScala.toMap,
receiptHandle = message.receiptHandle(),
messageId = message.messageId(),
lockTTL = lockTTL,
queueName = queueName,
queueUrl = queueUrl,
client = client
)
}
}
}
}
}.widen[Chunk[MessageContext[F, T]]]
.adaptError(makePullQueueException(_, queueName))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit serializer: Serializer[T], F: Async[F])
class SQSPusher[F[_], T](
client: SqsAsyncClient,
queueName: String,
queueUrl: String
)(implicit
serializer: Serializer[T],
F: Async[F])
extends QueuePusher[F, T] {

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
Expand All @@ -40,6 +47,7 @@ class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit seri
.build())
}
}.void
.adaptError(makePushQueueException(_, queueName))

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
Expand All @@ -59,5 +67,6 @@ class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit seri
.build())
}
}.void
.adaptError(makePushQueueException(_, queueName))

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName}

class SQSSubscriber[F[_], T](
getQueueUrl: F[String],
client: SqsAsyncClient
client: SqsAsyncClient,
queueName: String,
getQueueUrl: F[String]
)(implicit
F: Async[F],
deserializer: Deserializer[T])
Expand All @@ -41,13 +42,14 @@ class SQSSubscriber[F[_], T](
.build())
}
}.map(_.attributes().get(QueueAttributeName.VISIBILITY_TIMEOUT).toInt)
.adaptError(makeQueueException(_, queueName))

override def puller: Resource[F, QueuePuller[F, T]] =
Resource.eval {
for {
queueUrl <- getQueueUrl
lockTTL <- getLockTTL(queueUrl)
} yield new SQSPuller(client, queueUrl, lockTTL)
} yield new SQSPuller(client, queueName, queueUrl, lockTTL)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.aws

import com.commercetools.queue.{CannotPullException, CannotPushException, CannotSettleException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, Settlement, UnknownQueueException}
import software.amazon.awssdk.services.sqs.model.{QueueDoesNotExistException => AwsQueueDoesNotExistException, QueueNameExistsException}

package object sqs {

def makeQueueException(t: Throwable, queueName: String): QueueException = t match {
case _: AwsQueueDoesNotExistException => QueueDoesNotExistException(queueName, t)
case _: QueueNameExistsException => QueueAlreadyExistException(queueName, t)
case t: QueueException => t
case _ => UnknownQueueException(queueName, t)
}

def makePushQueueException(t: Throwable, queueName: String): QueueException =
new CannotPushException(queueName, makeQueueException(t, queueName))

def makePullQueueException(t: Throwable, queueName: String): QueueException =
new CannotPullException(queueName, makeQueueException(t, queueName))

def makeSettlementException(t: Throwable, queueName: String, msgId: String, action: Settlement): QueueException =
new CannotSettleException(msgId = msgId, action = action, inner = makeQueueException(t, queueName))

}

0 comments on commit bb98221

Please sign in to comment.