Skip to content

Commit

Permalink
fix: goroutine leak in publisher on close
Browse files Browse the repository at this point in the history
- closing publisher did not close all associated goroutines. The
routine listening for block events of connection only got terminated
on closing the connection.
The wrapping connection manager holds all blocking channels and only
passes down an universal blocking channel now. If a signal reaches this
channel it is broadcasted to all blocking channels. This allows telling
the connection manager to remove and close the channel if publisher is
closed.

Closes #149
  • Loading branch information
magictucura committed Dec 16, 2024
1 parent c8749e5 commit 4a0be05
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 14 deletions.
28 changes: 19 additions & 9 deletions internal/connectionmanager/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ type ConnectionManager struct {
reconnectionCount uint
reconnectionCountMu *sync.Mutex
dispatcher *dispatcher.Dispatcher

// universalNotifyBlockingReceiver receives block signal from underlying
// connection which are broadcasted to all publisherNotifyBlockingReceivers
universalNotifyBlockingReceiver chan amqp.Blocking
universalNotifyBlockingReceiverUsed bool
publisherNotifyBlockingReceiversMu *sync.RWMutex
publisherNotifyBlockingReceivers []chan amqp.Blocking
}

type Resolver interface {
Expand Down Expand Up @@ -62,17 +69,20 @@ func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger
}

connManager := ConnectionManager{
logger: log,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMu: &sync.RWMutex{},
ReconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
logger: log,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMu: &sync.RWMutex{},
ReconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
universalNotifyBlockingReceiver: make(chan amqp.Blocking),
publisherNotifyBlockingReceiversMu: &sync.RWMutex{},
}
go connManager.startNotifyClose()
go connManager.readUniversalBlockReceiver()
return &connManager, nil
}

Expand Down
43 changes: 38 additions & 5 deletions internal/connectionmanager/safe_wraps.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,43 @@ import (
func (connManager *ConnectionManager) NotifyBlockedSafe(
receiver chan amqp.Blocking,
) chan amqp.Blocking {
connManager.connectionMu.RLock()
defer connManager.connectionMu.RUnlock()
connManager.connectionMu.Lock()
defer connManager.connectionMu.Unlock()

return connManager.connection.NotifyBlocked(
receiver,
)
// add receiver to connection manager.
connManager.publisherNotifyBlockingReceiversMu.Lock()
connManager.publisherNotifyBlockingReceivers = append(connManager.publisherNotifyBlockingReceivers, receiver)
connManager.publisherNotifyBlockingReceiversMu.Unlock()

if !connManager.universalNotifyBlockingReceiverUsed {
connManager.connection.NotifyBlocked(
connManager.universalNotifyBlockingReceiver,
)
connManager.universalNotifyBlockingReceiverUsed = true
}

return receiver
}

// readUniversalBlockReceiver reads on universal blocking receiver and broadcasts event to all blocking receivers of
// connection manager.
func (connManager *ConnectionManager) readUniversalBlockReceiver() {
for b := range connManager.universalNotifyBlockingReceiver {
connManager.publisherNotifyBlockingReceiversMu.RLock()
for _, br := range connManager.publisherNotifyBlockingReceivers {
br <- b
}
connManager.publisherNotifyBlockingReceiversMu.RUnlock()
}
}

func (connManager *ConnectionManager) RemovePublisherBlockingReceiver(receiver chan amqp.Blocking) {
connManager.publisherNotifyBlockingReceiversMu.Lock()
for i, br := range connManager.publisherNotifyBlockingReceivers {
if br == receiver {
connManager.publisherNotifyBlockingReceivers = append(connManager.publisherNotifyBlockingReceivers[:i], connManager.publisherNotifyBlockingReceivers[i+1:]...)
}
}
connManager.publisherNotifyBlockingReceiversMu.Unlock()
close(receiver)
}
3 changes: 3 additions & 0 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type Publisher struct {
notifyPublishHandler func(p Confirmation)

options PublisherOptions

blockings chan amqp.Blocking
}

type PublisherConfirmation []*amqp.DeferredConfirmation
Expand Down Expand Up @@ -286,6 +288,7 @@ func (publisher *Publisher) Close() {
publisher.options.Logger.Warnf("error while closing the channel: %v", err)
}
publisher.options.Logger.Infof("closing publisher...")
publisher.connManager.RemovePublisherBlockingReceiver(publisher.blockings)
go func() {
publisher.closeConnectionToManagerCh <- struct{}{}
}()
Expand Down
1 change: 1 addition & 0 deletions publish_flow_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (publisher *Publisher) startNotifyFlowHandler() {
func (publisher *Publisher) startNotifyBlockedHandler() {
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMu.Lock()
publisher.blockings = blockings
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMu.Unlock()

Expand Down

0 comments on commit 4a0be05

Please sign in to comment.