Skip to content

Commit

Permalink
Upgrade amqp-go-client
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Nov 19, 2024
1 parent 0babb0e commit 71e1391
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ require (
require (
github.com/go-stomp/stomp/v3 v3.1.3
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241115073728-60e006b2a387
github.com/spf13/pflag v1.0.5
)
43 changes: 23 additions & 20 deletions pkg/mgmt/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,38 @@ func Get() rmq.IManagement {
return management
}

func DeclareAndBind(cfg config.Config, queue string, id int) rmq.IQueueInfo {
func DeclareAndBind(cfg config.Config, queueName string, id int) rmq.IQueueInfo {
if cfg.Queues == config.Predeclared {
return nil
}

mgmt := Get()

var queueType rmq.TQueueType
var queueType rmq.QueueType
switch cfg.Queues {
case config.Classic:
queueType = rmq.Classic
queueType = rmq.QueueType{Type: rmq.Classic}
case config.Quorum:
queueType = rmq.Quorum
queueType = rmq.QueueType{Type: rmq.Quorum}
case config.Stream:
queueType = rmq.Stream
queueType = rmq.QueueType{Type: rmq.Stream}
}

queue = strings.TrimPrefix(queue, "/queues/")
queueSpec := mgmt.Queue(queue).QueueType(rmq.QueueType{Type: queueType})
qi, err := queueSpec.Declare(context.Background())
queueName = strings.TrimPrefix(queueName, "/queues/")
qi, err := mgmt.DeclareQueue(context.TODO(), &rmq.QueueSpecification{
Name: queueName,
QueueType: queueType,
})
if err != nil {
log.Error("Failed to declare queue", "name", queue, "error", err)
log.Error("Failed to declare queue", "name", queueName, "error", err)
os.Exit(1)
}
log.Debug("queue declared", "name", qi.GetName(), "type", qi.Type())
log.Debug("queue declared", "name", qi.Name(), "type", qi.Type())

if cfg.CleanupQueues {
// if we don't need to delete at the end, there's no point in tracking declared queues
// note: DeleteAll() is always called, so the empty list serves as the mechanism to skip deletion
declaredQueues = append(declaredQueues, queue)
declaredQueues = append(declaredQueues, queueName)
}

var exchangeName, routingKey string
Expand All @@ -72,14 +74,16 @@ func DeclareAndBind(cfg config.Config, queue string, id int) rmq.IQueueInfo {
}

if exchangeName != "amq.default" {
exchangeSpec := mgmt.Exchange(exchangeName)
bindingSpec := mgmt.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key(routingKey)
err = bindingSpec.Bind(context.Background())
_, err = mgmt.Bind(context.TODO(), &rmq.BindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})
if err != nil {
log.Error("Failed to bind a queue", "exchange", exchangeName, "queue", queue, "key", routingKey, "error", err)
log.Error("Failed to bind a queue", "exchange", exchangeName, "queue", queueName, "key", routingKey, "error", err)
os.Exit(1)
}
log.Debug("binding declared", "exchange", exchangeName, "queue", queue, "key", routingKey)
log.Debug("binding declared", "exchange", exchangeName, "queue", queueName, "key", routingKey)
}

return qi
Expand Down Expand Up @@ -130,10 +134,9 @@ func parsePublishTo(publishTo string, id int) (string, string) {
}

func DeleteDeclaredQueues() {
for _, queue := range declaredQueues {
queueSpec := Get().Queue(queue)
err := queueSpec.Delete(context.Background())
log.Debug("Deleted queue", "name", queue, "error", err)
for _, queueName := range declaredQueues {
err := Get().DeleteQueue(context.TODO(), queueName)
log.Debug("Deleted queue", "name", queueName, "error", err)
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/mgmt/mgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ var _ = Describe("DeclareAndBind", func() {
log.Setup()
cfg := config.Config{Queues: config.Classic, PublishTo: "/queues/mgmt-classic"}
q := mgmt.DeclareAndBind(cfg, "mgmt-classic", 0)
Expect(q.GetName()).To(Equal("mgmt-classic"))
Expect(q.Name()).To(Equal("mgmt-classic"))
Expect(string(q.Type())).To(Equal("classic"))
err := mgmt.Get().Queue("mgmt-classic").Delete(context.Background())
err := mgmt.Get().DeleteQueue(context.TODO(), "mgmt-classic")
Expect(err).To(BeNil())
// TOOD assert the binding
})
Expand All @@ -32,9 +32,9 @@ var _ = Describe("DeclareAndBind", func() {
log.Setup()
cfg := config.Config{Queues: config.Quorum, PublishTo: "/queues/mgmt-quorum"}
q := mgmt.DeclareAndBind(cfg, "mgmt-quorum", 0)
Expect(q.GetName()).To(Equal("mgmt-quorum"))
Expect(q.Name()).To(Equal("mgmt-quorum"))
Expect(string(q.Type())).To(Equal("quorum"))
err := mgmt.Get().Queue("mgmt-quorum").Delete(context.Background())
err := mgmt.Get().DeleteQueue(context.TODO(), "mgmt-quorum")
Expect(err).To(BeNil())
// TOOD assert the binding
})
Expand All @@ -43,9 +43,9 @@ var _ = Describe("DeclareAndBind", func() {
log.Setup()
cfg := config.Config{Queues: config.Stream, PublishTo: "/queues/mgmt-stream"}
q := mgmt.DeclareAndBind(cfg, "mgmt-stream", 0)
Expect(q.GetName()).To(Equal("mgmt-stream"))
Expect(q.Name()).To(Equal("mgmt-stream"))
Expect(string(q.Type())).To(Equal("stream"))
err := mgmt.Get().Queue("mgmt-stream").Delete(context.Background())
err := mgmt.Get().DeleteQueue(context.TODO(), "mgmt-stream")
Expect(err).To(BeNil())
// TOOD assert the binding
})
Expand Down

0 comments on commit 71e1391

Please sign in to comment.