From 71e13918679b4c2fe99ca159c6222e28bb63c397 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Tue, 19 Nov 2024 12:50:04 +0100 Subject: [PATCH] Upgrade amqp-go-client --- go.mod | 2 +- pkg/mgmt/mgmt.go | 43 +++++++++++++++++++++++-------------------- pkg/mgmt/mgmt_test.go | 12 ++++++------ 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index d467721..171f734 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,6 @@ require ( require ( github.com/go-stomp/stomp/v3 v3.1.3 github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0 + github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241115073728-60e006b2a387 github.com/spf13/pflag v1.0.5 ) diff --git a/pkg/mgmt/mgmt.go b/pkg/mgmt/mgmt.go index 3521c77..a350d8f 100644 --- a/pkg/mgmt/mgmt.go +++ b/pkg/mgmt/mgmt.go @@ -26,36 +26,38 @@ func Get() rmq.IManagement { return management } -func DeclareAndBind(cfg config.Config, queue string, id int) rmq.IQueueInfo { +func DeclareAndBind(cfg config.Config, queueName string, id int) rmq.IQueueInfo { if cfg.Queues == config.Predeclared { return nil } mgmt := Get() - var queueType rmq.TQueueType + var queueType rmq.QueueType switch cfg.Queues { case config.Classic: - queueType = rmq.Classic + queueType = rmq.QueueType{Type: rmq.Classic} case config.Quorum: - queueType = rmq.Quorum + queueType = rmq.QueueType{Type: rmq.Quorum} case config.Stream: - queueType = rmq.Stream + queueType = rmq.QueueType{Type: rmq.Stream} } - queue = strings.TrimPrefix(queue, "/queues/") - queueSpec := mgmt.Queue(queue).QueueType(rmq.QueueType{Type: queueType}) - qi, err := queueSpec.Declare(context.Background()) + queueName = strings.TrimPrefix(queueName, "/queues/") + qi, err := mgmt.DeclareQueue(context.TODO(), &rmq.QueueSpecification{ + Name: queueName, + QueueType: queueType, + }) if err != nil { - log.Error("Failed to declare queue", "name", queue, "error", err) + log.Error("Failed to declare queue", "name", queueName, "error", err) os.Exit(1) } - log.Debug("queue declared", "name", qi.GetName(), "type", qi.Type()) + log.Debug("queue declared", "name", qi.Name(), "type", qi.Type()) if cfg.CleanupQueues { // if we don't need to delete at the end, there's no point in tracking declared queues // note: DeleteAll() is always called, so the empty list serves as the mechanism to skip deletion - declaredQueues = append(declaredQueues, queue) + declaredQueues = append(declaredQueues, queueName) } var exchangeName, routingKey string @@ -72,14 +74,16 @@ func DeclareAndBind(cfg config.Config, queue string, id int) rmq.IQueueInfo { } if exchangeName != "amq.default" { - exchangeSpec := mgmt.Exchange(exchangeName) - bindingSpec := mgmt.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key(routingKey) - err = bindingSpec.Bind(context.Background()) + _, err = mgmt.Bind(context.TODO(), &rmq.BindingSpecification{ + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: routingKey, + }) if err != nil { - log.Error("Failed to bind a queue", "exchange", exchangeName, "queue", queue, "key", routingKey, "error", err) + log.Error("Failed to bind a queue", "exchange", exchangeName, "queue", queueName, "key", routingKey, "error", err) os.Exit(1) } - log.Debug("binding declared", "exchange", exchangeName, "queue", queue, "key", routingKey) + log.Debug("binding declared", "exchange", exchangeName, "queue", queueName, "key", routingKey) } return qi @@ -130,10 +134,9 @@ func parsePublishTo(publishTo string, id int) (string, string) { } func DeleteDeclaredQueues() { - for _, queue := range declaredQueues { - queueSpec := Get().Queue(queue) - err := queueSpec.Delete(context.Background()) - log.Debug("Deleted queue", "name", queue, "error", err) + for _, queueName := range declaredQueues { + err := Get().DeleteQueue(context.TODO(), queueName) + log.Debug("Deleted queue", "name", queueName, "error", err) } } diff --git a/pkg/mgmt/mgmt_test.go b/pkg/mgmt/mgmt_test.go index b94ef7a..f331d98 100644 --- a/pkg/mgmt/mgmt_test.go +++ b/pkg/mgmt/mgmt_test.go @@ -21,9 +21,9 @@ var _ = Describe("DeclareAndBind", func() { log.Setup() cfg := config.Config{Queues: config.Classic, PublishTo: "/queues/mgmt-classic"} q := mgmt.DeclareAndBind(cfg, "mgmt-classic", 0) - Expect(q.GetName()).To(Equal("mgmt-classic")) + Expect(q.Name()).To(Equal("mgmt-classic")) Expect(string(q.Type())).To(Equal("classic")) - err := mgmt.Get().Queue("mgmt-classic").Delete(context.Background()) + err := mgmt.Get().DeleteQueue(context.TODO(), "mgmt-classic") Expect(err).To(BeNil()) // TOOD assert the binding }) @@ -32,9 +32,9 @@ var _ = Describe("DeclareAndBind", func() { log.Setup() cfg := config.Config{Queues: config.Quorum, PublishTo: "/queues/mgmt-quorum"} q := mgmt.DeclareAndBind(cfg, "mgmt-quorum", 0) - Expect(q.GetName()).To(Equal("mgmt-quorum")) + Expect(q.Name()).To(Equal("mgmt-quorum")) Expect(string(q.Type())).To(Equal("quorum")) - err := mgmt.Get().Queue("mgmt-quorum").Delete(context.Background()) + err := mgmt.Get().DeleteQueue(context.TODO(), "mgmt-quorum") Expect(err).To(BeNil()) // TOOD assert the binding }) @@ -43,9 +43,9 @@ var _ = Describe("DeclareAndBind", func() { log.Setup() cfg := config.Config{Queues: config.Stream, PublishTo: "/queues/mgmt-stream"} q := mgmt.DeclareAndBind(cfg, "mgmt-stream", 0) - Expect(q.GetName()).To(Equal("mgmt-stream")) + Expect(q.Name()).To(Equal("mgmt-stream")) Expect(string(q.Type())).To(Equal("stream")) - err := mgmt.Get().Queue("mgmt-stream").Delete(context.Background()) + err := mgmt.Get().DeleteQueue(context.TODO(), "mgmt-stream") Expect(err).To(BeNil()) // TOOD assert the binding })