Skip to content

Commit

Permalink
Don't re-declare queues
Browse files Browse the repository at this point in the history
If multiple publishers/consumers use the same queue,
it should be declared only once.
  • Loading branch information
mkuratczyk committed Dec 6, 2024
1 parent c87006f commit a01b262
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions pkg/mgmt/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ var (
type Mgmt struct {
ctx context.Context
conn rmq.IConnection
declaredQueues []string
declaredQueues map[string]bool
uris []string
cleanupQueues bool
}

func Start(ctx context.Context, uris []string, cleanupQueues bool) *Mgmt {
once.Do(func() {
instance = &Mgmt{ctx: ctx, uris: uris, cleanupQueues: cleanupQueues}
instance = &Mgmt{
ctx: ctx,
uris: uris,
cleanupQueues: cleanupQueues,
declaredQueues: make(map[string]bool),
}
})
return instance
}
Expand Down Expand Up @@ -103,7 +108,7 @@ func (m *Mgmt) DeclareQueues(cfg config.Config) {
}

func (m *Mgmt) DeclareAndBind(cfg config.Config, queueName string, id int) rmq.IQueueInfo {
if cfg.Queues == config.Predeclared {
if cfg.Queues == config.Predeclared || m.declaredQueues[queueName] {
return nil
}

Expand Down Expand Up @@ -133,7 +138,7 @@ func (m *Mgmt) DeclareAndBind(cfg config.Config, queueName string, id int) rmq.I
log.Debug("queue declared", "name", qi.Name(), "type", qi.Type())

if m.cleanupQueues {
m.declaredQueues = append(m.declaredQueues, queueName)
m.declaredQueues[queueName] = true
}

exchangeName, routingKey := parsePublishTo(cfg.PublisherProto, cfg.PublishTo, id)
Expand Down Expand Up @@ -230,7 +235,7 @@ func (m *Mgmt) DeleteDeclaredQueues() {
}

log.Info("Deleting queues...")
for _, queueName := range m.declaredQueues {
for queueName := range m.declaredQueues {
if m.conn == nil || m.conn.Status() != rmq.Open {
log.Info("Management connection lost; some queues were not deleted")
return
Expand Down

0 comments on commit a01b262

Please sign in to comment.